This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a99f733  PIP-25: Token based authentication (#2888)
a99f733 is described below

commit a99f733224bf25aced7302f506b67b0fd5043c73
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Nov 28 12:42:05 2018 -0800

    PIP-25: Token based authentication (#2888)
    
    * PIP-25: Token based authentication
    
    * Addressed comments
    
    * Use Authorization header
    
    * Update to support env: data: and file: as sources for keys and tokens
    
    * Fixed cli description
    
    * Updated broker.conf
    
    * Improved consistency in reading keys and CLI tools
    
    * Fixed check for http headers
    
    * Accept rel time with no specified unit
    
    * Fixed reading data: URL
    
    * Addressed comments
    
    * Added integration tests
    
    * Addressed comments
    
    * Added CLI command to validate token against key
    
    * Fixed integration tests
    
    * Removed env:
    
    * Fixed rel time parsing
---
 bin/pulsar                                         |   3 +
 conf/broker.conf                                   |  16 +
 conf/proxy.conf                                    |  16 +
 distribution/server/src/assemble/LICENSE.bin.txt   |   5 +-
 pom.xml                                            |  17 ++
 pulsar-broker-common/pom.xml                       |  15 +
 .../AuthenticationProviderToken.java               | 126 ++++++++
 .../authentication/utils/AuthTokenUtils.java       | 106 +++++++
 .../AuthenticationProviderTokenTest.java           | 287 ++++++++++++++++++
 .../pulsar/utils/auth/tokens/TokensCliUtils.java   | 324 +++++++++++++++++++++
 .../org/apache/pulsar/admin/cli/CliCommand.java    |  25 --
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  15 +-
 .../pulsar/admin/cli/CmdPersistentTopics.java      |  12 +-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |   5 +-
 .../apache/pulsar/client/api/Authentication.java   |   3 +
 .../pulsar/client/api/AuthenticationFactory.java   |  35 +++
 .../client/impl/auth/AuthenticationDataToken.java  |  65 +++++
 .../pulsar/client/impl/auth/AuthenticationTls.java |   8 +
 ...enticationTls.java => AuthenticationToken.java} |  64 ++--
 .../client/impl/auth/AuthenticationTokenTest.java  | 104 +++++++
 .../client/api/url/DataURLStreamHandler.java       |  24 +-
 .../api/url/PulsarURLStreamHandlerFactory.java     |   2 +-
 .../java/org/apache/pulsar/client/api/url/URL.java |   6 +-
 .../pulsar/common/util/RelativeTimeUtil.java       |  64 ++++
 .../pulsar/common/util/RelativeTimeUtilTest.java   |  63 ++++
 site2/docs/reference-configuration.md              |  12 +-
 .../token/PulsarTokenAuthenticationBaseSuite.java  | 204 +++++++++++++
 .../auth/token/TokenAuthWithPublicPrivateKeys.java |  90 ++++++
 .../auth/token/TokenAuthWithSymmetricKeys.java     |  73 +++++
 .../docker/ContainerExecResultBytes.java           |  33 +++
 .../integration/topologies/PulsarCluster.java      |   4 +
 .../tests/integration/utils/DockerUtils.java       |  91 +++++-
 32 files changed, 1831 insertions(+), 86 deletions(-)

diff --git a/bin/pulsar b/bin/pulsar
index 0bce9c4..9f4224b 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -137,6 +137,7 @@ where command is one of:
     initialize-cluster-metadata     One-time metadata initialization
     compact-topic       Run compaction against a topic
     zookeeper-shell     Open a ZK shell client
+    tokens              Utility to create authentication tokens
 
     help                This help message
 
@@ -331,6 +332,8 @@ elif [ $COMMAND == "sql" ]; then
     exec $JAVA -cp "${PRESTO_HOME}/lib/*" com.facebook.presto.cli.Presto 
--server localhost:8081 "${@}"
 elif [ $COMMAND == "sql-worker" ]; then
     exec ${PRESTO_HOME}/bin/launcher --etc-dir ${PULSAR_PRESTO_CONF} "${@}"
+elif [ $COMMAND == "tokens" ]; then
+      exec $JAVA $OPTS org.apache.pulsar.utils.auth.tokens.TokensCliUtils $@
 elif [ $COMMAND == "help" ]; then
     pulsar_help;
 else
diff --git a/conf/broker.conf b/conf/broker.conf
index 186b6d7..3c1853a 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -283,6 +283,22 @@ athenzDomainNames=
 # When this parameter is not empty, unauthenticated users perform as 
anonymousUserRole
 anonymousUserRole=
 
+### --- Token Authentication Provider --- ###
+
+## Symmetric key
+# Configure the secret key to be used to validate auth tokens
+# The key can be specified like:
+# tokenSecretKey=data:base64,xxxxxxxxx
+# tokenSecretKey=file:///my/secret.key
+tokenSecretKey=
+
+## Asymmetric public/private key pair
+# Configure the public key to be used to validate auth tokens
+# The key can be specified like:
+# tokenPublicKey=data:base64,xxxxxxxxx
+# tokenPublicKey=file:///my/public.key
+tokenPublicKey=
+
 ### --- BookKeeper Client --- ###
 
 # Authentication plugin to use when connecting to bookies
diff --git a/conf/proxy.conf b/conf/proxy.conf
index dbac95a..ffa6c45 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -129,6 +129,22 @@ tlsHostnameVerificationEnabled=false
 # certificate isn't trusted.
 tlsRequireTrustedClientCertOnConnect=false
 
+### --- Token Authentication Provider --- ###
+
+## Symmetric key
+# Configure the secret key to be used to validate auth tokens
+# The key can be specified like:
+# tokenSecretKey=data:base64,xxxxxxxxx
+# tokenSecretKey=file:///my/secret.key
+tokenSecretKey=
+
+## Asymmetric public/private key pair
+# Configure the public key to be used to validate auth tokens
+# The key can be specified like:
+# tokenPublicKey=data:base64,xxxxxxxxx
+# tokenPublicKey=file:///my/public.key
+tokenPublicKey=
+
 
 ### --- Deprecated config variables --- ###
 
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index bd6e8ae..d204122 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -469,10 +469,13 @@ The Apache Software License, Version 2.0
     - io.dropwizard.metrics-metrics-jvm-3.1.0.jar
   * Prometheus
     - io.prometheus-simpleclient_httpserver-0.5.0.jar
+  * Java JSON WebTokens
+    - io.jsonwebtoken-jjwt-api-0.10.5.jar
+    - io.jsonwebtoken-jjwt-impl-0.10.5.jar
+    - io.jsonwebtoken-jjwt-jackson-0.10.5.jar
   * JavaX Injection
     - javax.inject-javax.inject-1.jar
 
-
 BSD 3-clause "New" or "Revised" License
  * Google auth library
     - com.google.auth-google-auth-library-credentials-0.9.0.jar -- 
licenses/LICENSE-google-auth-library.txt
diff --git a/pom.xml b/pom.xml
index 9f72ceb..bbb8555 100644
--- a/pom.xml
+++ b/pom.xml
@@ -182,6 +182,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <flink.version>1.6.0</flink.version>
     <scala.binary.version>2.11</scala.binary.version>
     <debezium.version>0.8.2</debezium.version>
+    <jsonwebtoken.version>0.10.5</jsonwebtoken.version>
     <opencensus.version>0.12.3</opencensus.version>
 
     <!-- test dependencies -->
@@ -750,6 +751,22 @@ flexible messaging model and an intuitive client 
API.</description>
           </exclusion>
         </exclusions>
       </dependency>
+      
+      <dependency>
+        <groupId>io.jsonwebtoken</groupId>
+        <artifactId>jjwt-api</artifactId>
+        <version>${jsonwebtoken.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.jsonwebtoken</groupId>
+        <artifactId>jjwt-impl</artifactId>
+        <version>${jsonwebtoken.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.jsonwebtoken</groupId>
+        <artifactId>jjwt-jackson</artifactId>
+        <version>${jsonwebtoken.version}</version>
+      </dependency>
 
       <dependency>
         <groupId>org.aspectj</groupId>
diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml
index e1de84d..e7031f3 100644
--- a/pulsar-broker-common/pom.xml
+++ b/pulsar-broker-common/pom.xml
@@ -38,6 +38,12 @@
       <artifactId>pulsar-zookeeper-utils</artifactId>
       <version>${project.version}</version>
     </dependency>
+    
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
 
     <dependency>
       <groupId>com.google.guava</groupId>
@@ -54,5 +60,14 @@
       <artifactId>javax.ws.rs-api</artifactId>
     </dependency>
     
+    <dependency>
+      <groupId>io.jsonwebtoken</groupId>
+      <artifactId>jjwt-impl</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.jsonwebtoken</groupId>
+      <artifactId>jjwt-jackson</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
new file mode 100644
index 0000000..4ed8868
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import io.jsonwebtoken.Claims;
+import io.jsonwebtoken.Jwt;
+import io.jsonwebtoken.JwtException;
+import io.jsonwebtoken.Jwts;
+
+import java.io.IOException;
+import java.security.Key;
+
+import javax.naming.AuthenticationException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+
+public class AuthenticationProviderToken implements AuthenticationProvider {
+
+    public final static String HTTP_HEADER_NAME = "Authorization";
+    final static String HTTP_HEADER_VALUE_PREFIX = "Bearer ";
+
+    // When simmetric key is configured
+    final static String CONF_TOKEN_SECRET_KEY = "tokenSecretKey";
+
+    // When public/private key pair is configured
+    final static String CONF_TOKEN_PUBLIC_KEY = "tokenPublicKey";
+
+    private Key validationKey;
+
+    @Override
+    public void close() throws IOException {
+        // noop
+    }
+
+    @Override
+    public void initialize(ServiceConfiguration config) throws IOException {
+        this.validationKey = getValidationKey(config);
+    }
+
+    @Override
+    public String getAuthMethodName() {
+        return "token";
+    }
+
+    @Override
+    public String authenticate(AuthenticationDataSource authData) throws 
AuthenticationException {
+        String token = null;
+
+        if (authData.hasDataFromCommand()) {
+            // Authenticate Pulsar binary connection
+            token = authData.getCommandData();
+        } else if (authData.hasDataFromHttp()) {
+            // Authentication HTTP request. The format here should be 
compliant to RFC-6750
+            // (https://tools.ietf.org/html/rfc6750#section-2.1). Eg:
+            //
+            // Authorization: Bearer xxxxxxxxxxxxx
+            String httpHeaderValue = authData.getHttpHeader(HTTP_HEADER_NAME);
+            if (httpHeaderValue == null || 
!httpHeaderValue.startsWith(HTTP_HEADER_VALUE_PREFIX)) {
+                throw new AuthenticationException("Invalid HTTP Authorization 
header");
+            }
+
+            // Remove prefix
+            token = 
httpHeaderValue.substring(HTTP_HEADER_VALUE_PREFIX.length());
+        } else {
+            throw new AuthenticationException("No token credentials passed");
+        }
+
+        // Validate the token
+        try {
+            @SuppressWarnings("unchecked")
+            Jwt<?, Claims> jwt = Jwts.parser()
+                    .setSigningKey(validationKey)
+                    .parse(token);
+
+            return jwt.getBody().getSubject();
+        } catch (JwtException e) {
+            throw new AuthenticationException("Failed to authentication token: 
" + e.getMessage());
+        }
+    }
+
+    /**
+     * Try to get the validation key for tokens from several possible config 
options.
+     */
+    private static Key getValidationKey(ServiceConfiguration conf) throws 
IOException {
+        final boolean isPublicKey;
+        final String validationKeyConfig;
+
+        if (conf.getProperty(CONF_TOKEN_SECRET_KEY) != null
+                && !StringUtils.isBlank((String) 
conf.getProperty(CONF_TOKEN_SECRET_KEY))) {
+            isPublicKey = false;
+            validationKeyConfig = (String) 
conf.getProperty(CONF_TOKEN_SECRET_KEY);
+        } else if (conf.getProperty(CONF_TOKEN_PUBLIC_KEY) != null
+                && !StringUtils.isBlank((String) 
conf.getProperty(CONF_TOKEN_PUBLIC_KEY))) {
+            isPublicKey = true;
+            validationKeyConfig = (String) 
conf.getProperty(CONF_TOKEN_PUBLIC_KEY);
+        } else {
+            throw new IOException("No secret key was provided for token 
authentication");
+        }
+
+        byte[] validationKey = 
AuthTokenUtils.readKeyFromUrl(validationKeyConfig);
+
+        if (isPublicKey) {
+            return AuthTokenUtils.decodePublicKey(validationKey);
+        } else {
+            return AuthTokenUtils.decodeSecretKey(validationKey);
+        }
+    }
+}
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/utils/AuthTokenUtils.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/utils/AuthTokenUtils.java
new file mode 100644
index 0000000..08ff1c7
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/utils/AuthTokenUtils.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.utils;
+
+import com.google.common.io.ByteStreams;
+
+import io.jsonwebtoken.JwtBuilder;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import io.jsonwebtoken.io.Decoders;
+import io.jsonwebtoken.io.Encoders;
+import io.jsonwebtoken.security.Keys;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.Key;
+import java.security.KeyFactory;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.security.spec.X509EncodedKeySpec;
+import java.util.Date;
+import java.util.Optional;
+
+import javax.crypto.SecretKey;
+
+import lombok.experimental.UtilityClass;
+
+import org.apache.pulsar.client.api.url.URL;
+
+@UtilityClass
+public class AuthTokenUtils {
+
+    public static SecretKey createSecretKey(SignatureAlgorithm 
signatureAlgorithm) {
+        return Keys.secretKeyFor(signatureAlgorithm);
+    }
+
+    public static SecretKey decodeSecretKey(byte[] secretKey) {
+        return Keys.hmacShaKeyFor(secretKey);
+    }
+
+    public static PrivateKey decodePrivateKey(byte[] key) throws IOException {
+        try {
+            PKCS8EncodedKeySpec spec = new PKCS8EncodedKeySpec(key);
+            KeyFactory kf = KeyFactory.getInstance("RSA");
+            return kf.generatePrivate(spec);
+        } catch (Exception e) {
+            throw new IOException("Failed to decode private key", e);
+        }
+    }
+
+    public static PublicKey decodePublicKey(byte[] key) throws IOException {
+        try {
+            X509EncodedKeySpec spec = new X509EncodedKeySpec(key);
+            KeyFactory kf = KeyFactory.getInstance("RSA");
+            return kf.generatePublic(spec);
+        } catch (Exception e) {
+            throw new IOException("Failed to decode public key", e);
+        }
+    }
+
+    public static String encodeKeyBase64(Key key) {
+        return Encoders.BASE64.encode(key.getEncoded());
+    }
+
+    public static String createToken(Key signingKey, String subject, 
Optional<Date> expiryTime) {
+        JwtBuilder builder = Jwts.builder()
+                .setSubject(subject)
+                .signWith(signingKey);
+
+        if (expiryTime.isPresent()) {
+            builder.setExpiration(expiryTime.get());
+        }
+
+        return builder.compact();
+    }
+
+    public static byte[] readKeyFromUrl(String keyConfUrl) throws IOException {
+        if (keyConfUrl.startsWith("data:") || keyConfUrl.startsWith("file:")) {
+            try {
+                return ByteStreams.toByteArray((InputStream) new 
URL(keyConfUrl).getContent());
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        } else {
+            // Assume the key content was passed in base64
+            return Decoders.BASE64.decode(keyConfUrl);
+        }
+    }
+}
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
new file mode 100644
index 0000000..078a7b6
--- /dev/null
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import io.jsonwebtoken.Claims;
+import io.jsonwebtoken.Jwt;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import io.jsonwebtoken.io.Decoders;
+import io.jsonwebtoken.security.Keys;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.KeyPair;
+import java.security.PrivateKey;
+import java.sql.Date;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import javax.crypto.SecretKey;
+import javax.naming.AuthenticationException;
+
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.testng.annotations.Test;
+
+public class AuthenticationProviderTokenTest {
+
+    @Test
+    public void testInvalidInitialize() throws Exception {
+        AuthenticationProviderToken provider = new 
AuthenticationProviderToken();
+
+        try {
+            provider.initialize(new ServiceConfiguration());
+            fail("should have failed");
+        } catch (IOException e) {
+            // Expected, secret key was not defined
+        }
+
+        provider.close();
+    }
+
+    @Test
+    public void testSerializeSecretKey() throws Exception {
+        SecretKey secretKey = 
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+        String token = Jwts.builder()
+                .setSubject("my-test-subject")
+                .signWith(secretKey)
+                .compact();
+
+        @SuppressWarnings("unchecked")
+        Jwt<?, Claims> jwt = Jwts.parser()
+                
.setSigningKey(AuthTokenUtils.decodeSecretKey(secretKey.getEncoded()))
+                .parse(token);
+
+        System.out.println("Subject: " + jwt.getBody().getSubject());
+    }
+
+    @Test
+    public void testSerializeKeyPair() throws Exception {
+        KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
+
+        String privateKey = 
AuthTokenUtils.encodeKeyBase64(keyPair.getPrivate());
+        String publicKey = AuthTokenUtils.encodeKeyBase64(keyPair.getPublic());
+
+        String token = 
AuthTokenUtils.createToken(AuthTokenUtils.decodePrivateKey(Decoders.BASE64.decode(privateKey)),
+                "my-test-subject",
+                Optional.empty());
+
+        @SuppressWarnings("unchecked")
+        Jwt<?, Claims> jwt = Jwts.parser()
+                
.setSigningKey(AuthTokenUtils.decodePublicKey(Decoders.BASE64.decode(publicKey)))
+                .parse(token);
+
+        System.out.println("Subject: " + jwt.getBody().getSubject());
+    }
+
+    @Test
+    public void testAuthSecretKey() throws Exception {
+        SecretKey secretKey = 
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+        AuthenticationProviderToken provider = new 
AuthenticationProviderToken();
+        assertEquals(provider.getAuthMethodName(), "token");
+
+        Properties properties = new Properties();
+        
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_SECRET_KEY,
+                AuthTokenUtils.encodeKeyBase64(secretKey));
+
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setProperties(properties);
+        provider.initialize(conf);
+
+        try {
+            provider.authenticate(new AuthenticationDataSource() {
+            });
+            fail("Should have failed");
+        } catch (AuthenticationException e) {
+            // expected, no credential passed
+        }
+
+        String token = AuthTokenUtils.createToken(secretKey, 
"my-test-subject", Optional.empty());
+
+        // Pulsar protocol auth
+        String subject = provider.authenticate(new AuthenticationDataSource() {
+            @Override
+            public boolean hasDataFromCommand() {
+                return true;
+            }
+
+            @Override
+            public String getCommandData() {
+                return token;
+            }
+        });
+        assertEquals(subject, "my-test-subject");
+
+        // HTTP protocol auth
+        provider.authenticate(new AuthenticationDataSource() {
+            @Override
+            public boolean hasDataFromHttp() {
+                return true;
+            }
+
+            @Override
+            public String getHttpHeader(String name) {
+                if (name.equals("Authorization")) {
+                    return "Bearer " + token;
+                } else {
+                    throw new IllegalArgumentException("Wrong HTTP header");
+                }
+            }
+        });
+        assertEquals(subject, "my-test-subject");
+
+        // Expired token. This should be rejected by the authentication 
provider
+        String expiredToken = AuthTokenUtils.createToken(secretKey, 
"my-test-subject",
+                Optional.of(new Date(System.currentTimeMillis() - 
TimeUnit.HOURS.toMillis(1))));
+
+        // Pulsar protocol auth
+        try {
+            provider.authenticate(new AuthenticationDataSource() {
+                @Override
+                public boolean hasDataFromCommand() {
+                    return true;
+                }
+
+                @Override
+                public String getCommandData() {
+                    return expiredToken;
+                }
+            });
+            fail("Should have failed");
+        } catch (AuthenticationException e) {
+            // expected, token was expired
+        }
+
+        provider.close();
+    }
+
+    @Test
+    public void testAuthSecretKeyFromFile() throws Exception {
+        SecretKey secretKey = 
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+        File secretKeyFile = File.createTempFile("pular-test-secret-key-", 
".key");
+        secretKeyFile.deleteOnExit();
+        Files.write(Paths.get(secretKeyFile.toString()), 
secretKey.getEncoded());
+
+        AuthenticationProviderToken provider = new 
AuthenticationProviderToken();
+
+        Properties properties = new Properties();
+        
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_SECRET_KEY, 
"file://" + secretKeyFile.toString());
+
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setProperties(properties);
+        provider.initialize(conf);
+
+        String token = AuthTokenUtils.createToken(secretKey, 
"my-test-subject", Optional.empty());
+
+        // Pulsar protocol auth
+        String subject = provider.authenticate(new AuthenticationDataSource() {
+            @Override
+            public boolean hasDataFromCommand() {
+                return true;
+            }
+
+            @Override
+            public String getCommandData() {
+                return token;
+            }
+        });
+        assertEquals(subject, "my-test-subject");
+        provider.close();
+    }
+
+    @Test
+    public void testAuthSecretKeyFromDataBase64() throws Exception {
+        SecretKey secretKey = 
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+        AuthenticationProviderToken provider = new 
AuthenticationProviderToken();
+
+        Properties properties = new Properties();
+        
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_SECRET_KEY,
+                "data:;base64," + AuthTokenUtils.encodeKeyBase64(secretKey));
+
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setProperties(properties);
+        provider.initialize(conf);
+
+        String token = AuthTokenUtils.createToken(secretKey, 
"my-test-subject", Optional.empty());
+
+        // Pulsar protocol auth
+        String subject = provider.authenticate(new AuthenticationDataSource() {
+            @Override
+            public boolean hasDataFromCommand() {
+                return true;
+            }
+
+            @Override
+            public String getCommandData() {
+                return token;
+            }
+        });
+        assertEquals(subject, "my-test-subject");
+        provider.close();
+    }
+
+    @Test
+    public void testAuthSecretKeyPair() throws Exception {
+        KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
+
+        String privateKeyStr = 
AuthTokenUtils.encodeKeyBase64(keyPair.getPrivate());
+        String publicKeyStr = 
AuthTokenUtils.encodeKeyBase64(keyPair.getPublic());
+
+        AuthenticationProviderToken provider = new 
AuthenticationProviderToken();
+
+        Properties properties = new Properties();
+        // Use public key for validation
+        
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_PUBLIC_KEY, 
publicKeyStr);
+
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setProperties(properties);
+        provider.initialize(conf);
+
+        // Use private key to generate token
+        PrivateKey privateKey = 
AuthTokenUtils.decodePrivateKey(Decoders.BASE64.decode(privateKeyStr));
+        String token = AuthTokenUtils.createToken(privateKey, 
"my-test-subject", Optional.empty());
+
+        // Pulsar protocol auth
+        String subject = provider.authenticate(new AuthenticationDataSource() {
+            @Override
+            public boolean hasDataFromCommand() {
+                return true;
+            }
+
+            @Override
+            public String getCommandData() {
+                return token;
+            }
+        });
+        assertEquals(subject, "my-test-subject");
+
+        provider.close();
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java
new file mode 100644
index 0000000..19e71f3
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils.auth.tokens;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Charsets;
+
+import io.jsonwebtoken.Claims;
+import io.jsonwebtoken.Jwt;
+import io.jsonwebtoken.JwtException;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import io.jsonwebtoken.io.Decoders;
+import io.jsonwebtoken.io.Encoders;
+import io.jsonwebtoken.security.Keys;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Key;
+import java.security.KeyPair;
+import java.util.Date;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import javax.crypto.SecretKey;
+import javax.naming.AuthenticationException;
+
+import lombok.Cleanup;
+
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.common.util.RelativeTimeUtil;
+
+public class TokensCliUtils {
+
+    public static class Arguments {
+        @Parameter(names = { "-h", "--help" }, description = "Show this help 
message")
+        private boolean help = false;
+    }
+
+    @Parameters(commandDescription = "Create a new secret key")
+    public static class CommandCreateSecretKey {
+        @Parameter(names = { "-a",
+                "--signature-algorithm" }, description = "The signature 
algorithm for the new secret key.")
+        SignatureAlgorithm algorithm = SignatureAlgorithm.HS256;
+
+        @Parameter(names = { "-o",
+                "--output" }, description = "Write the secret key to a file 
instead of stdout")
+        String outputFile;
+
+        @Parameter(names = {
+                "-b", "--base64" }, description = "Encode the key in base64")
+        boolean base64 = false;
+
+        public void run() throws IOException {
+            SecretKey secretKey = AuthTokenUtils.createSecretKey(algorithm);
+            byte[] encoded = secretKey.getEncoded();
+
+            if (base64) {
+                encoded = Encoders.BASE64.encode(encoded).getBytes();
+            }
+
+            if (outputFile != null) {
+                Files.write(Paths.get(outputFile), encoded);
+            } else {
+                System.out.write(encoded);
+            }
+        }
+    }
+
+    @Parameters(commandDescription = "Create a new or pair of keys 
public/private")
+    public static class CommandCreateKeyPair {
+        @Parameter(names = { "-a",
+                "--signature-algorithm" }, description = "The signature 
algorithm for the new key pair.")
+        SignatureAlgorithm algorithm = SignatureAlgorithm.RS256;
+
+        @Parameter(names = {
+                "--output-private-key" }, description = "File where to write 
the private key", required = true)
+        String privateKeyFile;
+        @Parameter(names = {
+                "--output-public-key" }, description = "File where to write 
the public key", required = true)
+        String publicKeyFile;
+
+        public void run() throws IOException {
+            KeyPair pair = Keys.keyPairFor(algorithm);
+
+            Files.write(Paths.get(publicKeyFile), 
pair.getPublic().getEncoded());
+            Files.write(Paths.get(privateKeyFile), 
pair.getPrivate().getEncoded());
+        }
+    }
+
+    @Parameters(commandDescription = "Create a new token")
+    public static class CommandCreateToken {
+
+        @Parameter(names = { "-s",
+                "--subject" }, description = "Specify the 'subject' or 
'principal' associate with this token", required = true)
+        private String subject;
+
+        @Parameter(names = { "-e",
+                "--expiry-time" }, description = "Relative expiry time for the 
token (eg: 1h, 3d, 10y). (m=minutes) Default: no expiration")
+        private String expiryTime;
+
+        @Parameter(names = { "-sk",
+                "--secret-key" }, description = "Pass the secret key for 
signing the token. This can either be: data:, file:, etc..")
+        private String secretKey;
+
+        @Parameter(names = { "-pk",
+                "--private-key" }, description = "Pass the private key for 
signing the token. This can either be: data:, file:, etc..")
+        private String privateKey;
+
+        public void run() throws Exception {
+            if (secretKey == null && privateKey == null) {
+                System.err.println(
+                        "Either --secret-key or --private-key needs to be 
passed for signing a token");
+                System.exit(1);
+            } else if (secretKey != null && privateKey != null) {
+                System.err.println(
+                        "Only one of --secret-key and --private-key needs to 
be passed for signing a token");
+                System.exit(1);
+            }
+
+            Key signingKey;
+
+            if (privateKey != null) {
+                byte[] encodedKey = AuthTokenUtils.readKeyFromUrl(privateKey);
+                signingKey = AuthTokenUtils.decodePrivateKey(encodedKey);
+            } else {
+                byte[] encodedKey = AuthTokenUtils.readKeyFromUrl(secretKey);
+                signingKey = AuthTokenUtils.decodeSecretKey(encodedKey);
+            }
+
+            Optional<Date> optExpiryTime = Optional.empty();
+            if (expiryTime != null) {
+                long relativeTimeMillis = TimeUnit.SECONDS
+                        
.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(expiryTime));
+                optExpiryTime = Optional.of(new 
Date(System.currentTimeMillis() + relativeTimeMillis));
+            }
+
+            String token = AuthTokenUtils.createToken(signingKey, subject, 
optExpiryTime);
+            System.out.println(token);
+        }
+    }
+
+    @Parameters(commandDescription = "Show the content of token")
+    public static class CommandShowToken {
+
+        @Parameter(description = "The token string", arity = 1)
+        private java.util.List<String> args;
+
+        @Parameter(names = { "-i",
+                "--stdin" }, description = "Read token from standard input")
+        private Boolean stdin = false;
+
+        @Parameter(names = { "-f",
+                "--token-file" }, description = "Read token from a file")
+        private String tokenFile;
+
+        public void run() throws Exception {
+            String token;
+            if (args != null) {
+                token = args.get(0);
+            } else if (stdin) {
+                @Cleanup
+                BufferedReader r = new BufferedReader(new 
InputStreamReader(System.in));
+                token = r.readLine();
+            } else if (tokenFile != null) {
+                token = new String(Files.readAllBytes(Paths.get(tokenFile)), 
Charsets.UTF_8);
+            } else if (System.getenv("TOKEN") != null) {
+                token = System.getenv("TOKEN");
+            } else {
+                System.err.println(
+                        "Token needs to be either passed as an argument or 
through `--stdin`, `--token-file` or by the `TOKEN` environment variable");
+                System.exit(1);
+                return;
+            }
+
+            String[] parts = token.split("\\.");
+            System.out.println(new 
String(Decoders.BASE64URL.decode(parts[0])));
+            System.out.println("---");
+            System.out.println(new 
String(Decoders.BASE64URL.decode(parts[1])));
+        }
+    }
+
+    @Parameters(commandDescription = "Validate a token against a key")
+    public static class CommandValidateToken {
+
+        @Parameter(description = "The token string", arity = 1)
+        private java.util.List<String> args;
+
+        @Parameter(names = { "-i",
+                "--stdin" }, description = "Read token from standard input")
+        private Boolean stdin = false;
+
+        @Parameter(names = { "-f",
+                "--token-file" }, description = "Read token from a file")
+        private String tokenFile;
+
+        @Parameter(names = { "-sk",
+                "--secret-key" }, description = "Pass the secret key for 
validating the token. This can either be: data:, file:, etc..")
+        private String secretKey;
+
+        @Parameter(names = { "-pk",
+                "--public-key" }, description = "Pass the public key for 
validating the token. This can either be: data:, file:, etc..")
+        private String publicKey;
+
+        public void run() throws Exception {
+            if (secretKey == null && publicKey == null) {
+                System.err.println(
+                        "Either --secret-key or --public-key needs to be 
passed for signing a token");
+                System.exit(1);
+            } else if (secretKey != null && publicKey != null) {
+                System.err.println(
+                        "Only one of --secret-key and --public-key needs to be 
passed for signing a token");
+                System.exit(1);
+            }
+
+            String token;
+            if (args != null) {
+                token = args.get(0);
+            } else if (stdin) {
+                @Cleanup
+                BufferedReader r = new BufferedReader(new 
InputStreamReader(System.in));
+                token = r.readLine();
+            } else if (tokenFile != null) {
+                token = new String(Files.readAllBytes(Paths.get(tokenFile)), 
Charsets.UTF_8);
+            } else if (System.getenv("TOKEN") != null) {
+                token = System.getenv("TOKEN");
+            } else {
+                System.err.println(
+                        "Token needs to be either passed as an argument or 
through `--stdin`, `--token-file` or by the `TOKEN` environment variable");
+                System.exit(1);
+                return;
+            }
+
+            Key validationKey;
+
+            if (publicKey != null) {
+                byte[] encodedKey = AuthTokenUtils.readKeyFromUrl(publicKey);
+                validationKey = AuthTokenUtils.decodePublicKey(encodedKey);
+            } else {
+                byte[] encodedKey = AuthTokenUtils.readKeyFromUrl(secretKey);
+                validationKey = AuthTokenUtils.decodeSecretKey(encodedKey);
+            }
+
+            // Validate the token
+            @SuppressWarnings("unchecked")
+            Jwt<?, Claims> jwt = Jwts.parser()
+                    .setSigningKey(validationKey)
+                    .parse(token);
+
+            System.out.println(jwt.getBody());
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        Arguments arguments = new Arguments();
+        JCommander jcommander = new JCommander(arguments);
+
+        CommandCreateSecretKey commandCreateSecretKey = new 
CommandCreateSecretKey();
+        jcommander.addCommand("create-secret-key", commandCreateSecretKey);
+
+        CommandCreateKeyPair commandCreateKeyPair = new CommandCreateKeyPair();
+        jcommander.addCommand("create-key-pair", commandCreateKeyPair);
+
+        CommandCreateToken commandCreateToken = new CommandCreateToken();
+        jcommander.addCommand("create", commandCreateToken);
+
+        CommandShowToken commandShowToken = new CommandShowToken();
+        jcommander.addCommand("show", commandShowToken);
+
+        CommandValidateToken commandValidateToken = new CommandValidateToken();
+        jcommander.addCommand("validate", commandValidateToken);
+
+        try {
+            jcommander.parse(args);
+
+            if (arguments.help || jcommander.getParsedCommand() == null) {
+                jcommander.usage();
+                System.exit(1);
+            }
+        } catch (Exception e) {
+            jcommander.usage();
+            System.err.println(e);
+            System.exit(1);
+        }
+
+        String cmd = jcommander.getParsedCommand();
+
+        if (cmd.equals("create-secret-key")) {
+            commandCreateSecretKey.run();
+        } else if (cmd.equals("create-key-pair")) {
+            commandCreateKeyPair.run();
+        } else if (cmd.equals("create")) {
+            commandCreateToken.run();
+        } else if (cmd.equals("show")) {
+            commandShowToken.run();
+        } else if (cmd.equals("validate")) {
+            commandValidateToken.run();
+        } else {
+            System.err.println("Invalid command: " + cmd);
+            System.exit(1);
+        }
+    }
+}
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
index 7af1b9f..99d27d8 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CliCommand.java
@@ -102,31 +102,6 @@ abstract class CliCommand {
         }
     }
 
-    static int validateTimeString(String s) {
-        char last = s.charAt(s.length() - 1);
-        String subStr = s.substring(0, s.length() - 1);
-        switch (last) {
-        case 'm':
-        case 'M':
-            return Integer.parseInt(subStr);
-
-        case 'h':
-        case 'H':
-            return Integer.parseInt(subStr) * 60;
-
-        case 'd':
-        case 'D':
-            return Integer.parseInt(subStr) * 24 * 60;
-
-        case 'w':
-        case 'W':
-            return Integer.parseInt(subStr) * 7 * 24 * 60;
-
-        default:
-            return Integer.parseInt(s);
-        }
-    }
-
     static MessageId validateMessageIdString(String resetMessageIdStr) throws 
PulsarAdminException {
         String[] messageId = resetMessageIdStr.split(":");
         try {
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 3b724f2..6650618 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -44,6 +44,7 @@ import 
org.apache.pulsar.common.policies.data.RetentionPolicies;
 import 
org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
+import org.apache.pulsar.common.util.RelativeTimeUtil;
 
 @Parameters(commandDescription = "Operations about namespaces")
 public class CmdNamespaces extends CmdBase {
@@ -352,9 +353,16 @@ public class CmdNamespaces extends CmdBase {
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
             long sizeLimit = validateSizeString(limitStr);
-            int retentionTimeInMin = validateTimeString(retentionTimeStr);
+            long retentionTimeInSec = 
RelativeTimeUtil.parseRelativeTimeInSeconds(retentionTimeStr);
 
-            int retentionSizeInMB;
+            final int retentionTimeInMin;
+            if (retentionTimeInSec != -1) {
+                retentionTimeInMin = (int) 
TimeUnit.SECONDS.toMinutes(retentionTimeInSec);
+            } else {
+                retentionTimeInMin = -1;
+            }
+
+            final int retentionSizeInMB;
             if (sizeLimit != -1) {
                 retentionSizeInMB = (int) (sizeLimit / (1024 * 1024));
             } else {
@@ -901,7 +909,8 @@ public class CmdNamespaces extends CmdBase {
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
-            admin.namespaces().setOffloadDeleteLag(namespace, 
validateTimeString(lag), TimeUnit.MINUTES);
+            admin.namespaces().setOffloadDeleteLag(namespace, 
RelativeTimeUtil.parseRelativeTimeInSeconds(lag),
+                    TimeUnit.SECONDS);
         }
     }
 
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
index bdc8761..2314ae6 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
@@ -31,7 +31,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-
+import org.apache.pulsar.common.util.RelativeTimeUtil;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
@@ -240,7 +240,7 @@ public class CmdPersistentTopics extends CmdBase {
 
         @Parameter(description = 
"persistent://property/cluster/namespace/topic\n", required = true)
         private java.util.List<String> params;
-        
+
         @Parameter(names = "--force", description = "Close all 
producer/consumer/replicator and delete topic forcefully")
         private boolean force = false;
 
@@ -259,7 +259,7 @@ public class CmdPersistentTopics extends CmdBase {
 
         @Parameter(names = "--force", description = "Close all 
producer/consumer/replicator and delete topic forcefully")
         private boolean force = false;
-        
+
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
@@ -374,7 +374,7 @@ public class CmdPersistentTopics extends CmdBase {
             
print(persistentTopics.getPartitionedInternalStats(persistentTopic));
         }
     }
-    
+
     @Parameters(commandDescription = "Skip all the messages for the 
subscription")
     private class SkipAll extends CliCommand {
         @Parameter(description = 
"persistent://property/cluster/namespace/topic", required = true)
@@ -496,8 +496,8 @@ public class CmdPersistentTopics extends CmdBase {
                 MessageId messageId = 
validateMessageIdString(resetMessageIdStr);
                 persistentTopics.resetCursor(persistentTopic, subName, 
messageId);
             } else if (isNotBlank(resetTimeStr)) {
-                int resetBackTimeInMin = validateTimeString(resetTimeStr);
-                long resetTimeInMillis = 
TimeUnit.MILLISECONDS.convert(resetBackTimeInMin, TimeUnit.MINUTES);
+                long resetTimeInMillis = TimeUnit.SECONDS
+                        
.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
                 // now - go back time
                 long timestamp = System.currentTimeMillis() - 
resetTimeInMillis;
                 persistentTopics.resetCursor(persistentTopic, subName, 
timestamp);
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index a85df44..5014f14 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.util.RelativeTimeUtil;
 
 @Parameters(commandDescription = "Operations on persistent topics")
 public class CmdTopics extends CmdBase {
@@ -494,8 +495,8 @@ public class CmdTopics extends CmdBase {
                 MessageId messageId = 
validateMessageIdString(resetMessageIdStr);
                 topics.resetCursor(persistentTopic, subName, messageId);
             } else if (isNotBlank(resetTimeStr)) {
-                int resetBackTimeInMin = validateTimeString(resetTimeStr);
-                long resetTimeInMillis = 
TimeUnit.MILLISECONDS.convert(resetBackTimeInMin, TimeUnit.MINUTES);
+                long resetTimeInMillis = TimeUnit.SECONDS
+                        
.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
                 // now - go back time
                 long timestamp = System.currentTimeMillis() - 
resetTimeInMillis;
                 topics.resetCursor(persistentTopic, subName, timestamp);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Authentication.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Authentication.java
index bd564f5..b3a2172 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Authentication.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Authentication.java
@@ -22,6 +22,9 @@ import java.io.Closeable;
 import java.io.Serializable;
 import java.util.Map;
 
+/**
+ * Interface of authentication providers.
+ */
 public interface Authentication extends Closeable, Serializable {
 
     /**
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/AuthenticationFactory.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/AuthenticationFactory.java
index f955a56..21060a1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/AuthenticationFactory.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/AuthenticationFactory.java
@@ -19,15 +19,50 @@
 package org.apache.pulsar.client.api;
 
 import java.util.Map;
+import java.util.function.Supplier;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
 
 public final class AuthenticationFactory {
 
     /**
+     * Create an authentication provider for token based authentication.
+     *
+     * @param token
+     *            the client auth token
+     */
+    public static Authentication token(String token) {
+        return new AuthenticationToken(token);
+    }
+
+    /**
+     * Create an authentication provider for token based authentication.
+     *
+     * @param tokenSupplier
+     *            a supplier of the client auth token
+     */
+    public static Authentication token(Supplier<String> tokenSupplier) {
+        return new AuthenticationToken(tokenSupplier);
+    }
+
+    /**
+     * Create an authentication provider for TLS based authentication.
+     *
+     * @param certFilePath
+     *            the path to the TLS client public key
+     * @param keyFilePath
+     *            the path to the TLS client private key
+     */
+    public static Authentication TLS(String certFilePath, String keyFilePath) {
+        return new AuthenticationTls(certFilePath, keyFilePath);
+    }
+
+    /**
      * Create an instance of the Authentication-Plugin
      *
      * @param authPluginClassName name of the Authentication-Plugin you want 
to use
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataToken.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataToken.java
new file mode 100644
index 0000000..f04400b
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataToken.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.auth;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+
+public class AuthenticationDataToken implements AuthenticationDataProvider {
+    public final static String HTTP_HEADER_NAME = "Authorization";
+
+    private final Supplier<String> tokenSupplier;
+
+    public AuthenticationDataToken(Supplier<String> tokenSupplier) {
+        this.tokenSupplier = tokenSupplier;
+    }
+
+    @Override
+    public boolean hasDataForHttp() {
+        return true;
+    }
+
+    @Override
+    public Set<Map.Entry<String, String>> getHttpHeaders() {
+        return Collections.singletonMap(HTTP_HEADER_NAME, "Bearer " + 
getToken()).entrySet();
+    }
+
+    @Override
+    public boolean hasDataFromCommand() {
+        return true;
+    }
+
+    @Override
+    public String getCommandData() {
+        return getToken();
+    }
+
+    private String getToken() {
+        try {
+            return tokenSupplier.get();
+        } catch (Throwable t) {
+            throw new RuntimeException("failed to get client token", t);
+        }
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
index b2ad875..8594879 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
@@ -41,6 +41,14 @@ public class AuthenticationTls implements Authentication, 
EncodedAuthenticationP
     private String certFilePath;
     private String keyFilePath;
 
+    public AuthenticationTls() {
+    }
+
+    public AuthenticationTls(String certFilePath, String keyFilePath) {
+        this.certFilePath = certFilePath;
+        this.keyFilePath = keyFilePath;
+    }
+
     @Override
     public void close() throws IOException {
         // noop
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java
similarity index 52%
copy from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
copy to 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java
index b2ad875..9e49016 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationTls.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationToken.java
@@ -16,30 +16,40 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pulsar.client.impl.auth;
 
+import com.google.common.base.Charsets;
+
 import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Map;
+import java.util.function.Supplier;
 
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.apache.pulsar.client.api.AuthenticationUtil;
 import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
 import org.apache.pulsar.client.api.PulsarClientException;
 
 /**
- *
- * This plugin requires these parameters
- *
- * tlsCertFile: A file path for a client certificate. tlsKeyFile: A file path 
for a client private key.
- *
+ * Token based authentication provider.
  */
-public class AuthenticationTls implements Authentication, 
EncodedAuthenticationParameterSupport {
+public class AuthenticationToken implements Authentication, 
EncodedAuthenticationParameterSupport {
+
+    private Supplier<String> tokenSupplier;
 
-    private static final long serialVersionUID = 1L;
+    public AuthenticationToken() {
+    }
 
-    private String certFilePath;
-    private String keyFilePath;
+    public AuthenticationToken(String token) {
+        this(() -> token);
+    }
+
+    public AuthenticationToken(Supplier<String> tokenSupplier) {
+        this.tokenSupplier = tokenSupplier;
+    }
 
     @Override
     public void close() throws IOException {
@@ -48,27 +58,38 @@ public class AuthenticationTls implements Authentication, 
EncodedAuthenticationP
 
     @Override
     public String getAuthMethodName() {
-        return "tls";
+        return "token";
     }
 
     @Override
     public AuthenticationDataProvider getAuthData() throws 
PulsarClientException {
-        try {
-            return new AuthenticationDataTls(certFilePath, keyFilePath);
-        } catch (Exception e) {
-            throw new PulsarClientException(e);
-        }
+        return new AuthenticationDataToken(tokenSupplier);
     }
 
     @Override
     public void configure(String encodedAuthParamString) {
-        
setAuthParams(AuthenticationUtil.configureFromPulsar1AuthParamString(encodedAuthParamString));
+        // Interpret the whole param string as the token. If the string 
contains the notation `token:xxxxx` then strip
+        // the prefix
+        if (encodedAuthParamString.startsWith("token:")) {
+            this.tokenSupplier = () -> 
encodedAuthParamString.substring("token:".length());
+        } else if (encodedAuthParamString.startsWith("file:")) {
+            // Read token from a file
+            URI filePath = URI.create(encodedAuthParamString);
+            this.tokenSupplier = () -> {
+                try {
+                    return new String(Files.readAllBytes(Paths.get(filePath)), 
Charsets.UTF_8);
+                } catch (IOException e) {
+                    throw new RuntimeException("Failed to read token from 
file", e);
+                }
+            };
+        } else {
+            this.tokenSupplier = () -> encodedAuthParamString;
+        }
     }
 
     @Override
-    @Deprecated
     public void configure(Map<String, String> authParams) {
-        setAuthParams(authParams);
+        // noop
     }
 
     @Override
@@ -76,9 +97,4 @@ public class AuthenticationTls implements Authentication, 
EncodedAuthenticationP
         // noop
     }
 
-    private void setAuthParams(Map<String, String> authParams) {
-        certFilePath = authParams.get("tlsCertFile");
-        keyFilePath = authParams.get("tlsKeyFile");
-    }
-
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java
new file mode 100644
index 0000000..b2f3a7f
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTokenTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.base.Charsets;
+
+import java.io.File;
+import java.util.Collections;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.testng.annotations.Test;
+
+public class AuthenticationTokenTest {
+
+    @Test
+    public void testAuthToken() throws Exception {
+        AuthenticationToken authToken = new AuthenticationToken("token-xyz");
+        assertEquals(authToken.getAuthMethodName(), "token");
+
+        AuthenticationDataProvider authData = authToken.getAuthData();
+        assertTrue(authData.hasDataFromCommand());
+        assertEquals(authData.getCommandData(), "token-xyz");
+
+        assertFalse(authData.hasDataForTls());
+        assertNull(authData.getTlsCertificates());
+        assertNull(authData.getTlsPrivateKey());
+
+        assertTrue(authData.hasDataForHttp());
+        assertEquals(authData.getHttpHeaders(),
+                Collections.singletonMap("Authorization", "Bearer 
token-xyz").entrySet());
+
+        authToken.close();
+    }
+
+    @Test
+    public void testAuthTokenConfig() throws Exception {
+        AuthenticationToken authToken = new AuthenticationToken();
+        authToken.configure("token:my-test-token-string");
+        assertEquals(authToken.getAuthMethodName(), "token");
+
+        AuthenticationDataProvider authData = authToken.getAuthData();
+        assertTrue(authData.hasDataFromCommand());
+        assertEquals(authData.getCommandData(), "my-test-token-string");
+        authToken.close();
+    }
+
+    @Test
+    public void testAuthTokenConfigFromFile() throws Exception {
+        File tokenFile = File.createTempFile("pular-test-token", ".key");
+        tokenFile.deleteOnExit();
+        FileUtils.write(tokenFile, "my-test-token-string", Charsets.UTF_8);
+
+        AuthenticationToken authToken = new AuthenticationToken();
+        authToken.configure("file://" + tokenFile);
+        assertEquals(authToken.getAuthMethodName(), "token");
+
+        AuthenticationDataProvider authData = authToken.getAuthData();
+        assertTrue(authData.hasDataFromCommand());
+        assertEquals(authData.getCommandData(), "my-test-token-string");
+
+        // Ensure if the file content changes, the token will get refreshed as 
well
+        FileUtils.write(tokenFile, "other-token", Charsets.UTF_8);
+
+        AuthenticationDataProvider authData2 = authToken.getAuthData();
+        assertTrue(authData2.hasDataFromCommand());
+        assertEquals(authData2.getCommandData(), "other-token");
+
+        authToken.close();
+    }
+
+    @Test
+    public void testAuthTokenConfigNoPrefix() throws Exception {
+        AuthenticationToken authToken = new AuthenticationToken();
+        authToken.configure("my-test-token-string");
+        assertEquals(authToken.getAuthMethodName(), "token");
+
+        AuthenticationDataProvider authData = authToken.getAuthData();
+        assertTrue(authData.hasDataFromCommand());
+        assertEquals(authData.getCommandData(), "my-test-token-string");
+        authToken.close();
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
similarity index 82%
rename from 
pulsar-client/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
rename to 
pulsar-common/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
index f4147cd..e70dcec 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/DataURLStreamHandler.java
@@ -33,12 +33,15 @@ import java.util.regex.Pattern;
 
 public class DataURLStreamHandler extends URLStreamHandler {
 
-    class DataURLConnection extends URLConnection {
+    static class DataURLConnection extends URLConnection {
         private boolean parsed = false;
         private String contentType;
-        private String data;
+        private byte[] data;
         private URI uri;
 
+        private static final Pattern pattern = Pattern.compile(
+              
"(?<mimeType>[^;,]+)?(;(?<charset>charset=[^;,]+))?(;(?<base64>base64))?,(?<data>.+)",
 Pattern.DOTALL);
+
         protected DataURLConnection(URL url) {
             super(url);
             try {
@@ -57,20 +60,19 @@ public class DataURLStreamHandler extends URLStreamHandler {
             if (this.uri == null) {
                 throw new IOException();
             }
-            Pattern pattern = Pattern.compile(
-                    
"(?<mimeType>.+?)(;(?<charset>charset=.+?))?(;(?<base64>base64?))?,(?<data>.+)",
 Pattern.DOTALL);
+
             Matcher matcher = 
pattern.matcher(this.uri.getSchemeSpecificPart());
             if (matcher.matches()) {
                 this.contentType = matcher.group("mimeType");
-                String charset = matcher.group("charset");
-                if (charset == null) {
-                    charset = "US-ASCII";
+                if (contentType == null) {
+                    this.contentType = "application/data";
                 }
+
                 if (matcher.group("base64") == null) {
                     // Support Urlencode but not decode here because already 
decoded by URI class.
-                    this.data = new String(matcher.group("data").getBytes(), 
charset);
+                    this.data = matcher.group("data").getBytes();
                 } else {
-                    this.data = new 
String(Base64.getDecoder().decode(matcher.group("data")), charset);
+                    this.data = 
Base64.getDecoder().decode(matcher.group("data"));
                 }
             } else {
                 throw new MalformedURLException();
@@ -83,7 +85,7 @@ public class DataURLStreamHandler extends URLStreamHandler {
             long length;
             try {
                 this.connect();
-                length = this.data.length();
+                length = this.data.length;
             } catch (IOException e) {
                 length = -1;
             }
@@ -109,7 +111,7 @@ public class DataURLStreamHandler extends URLStreamHandler {
 
         public InputStream getInputStream() throws IOException {
             this.connect();
-            return new ByteArrayInputStream(this.data.getBytes());
+            return new ByteArrayInputStream(this.data);
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java
similarity index 95%
rename from 
pulsar-client/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java
rename to 
pulsar-common/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java
index af7b668..b09d384 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/PulsarURLStreamHandlerFactory.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class PulsarURLStreamHandlerFactory implements URLStreamHandlerFactory {
-    static Map<String, Class<? extends URLStreamHandler>> handlers;
+    private static final Map<String, Class<? extends URLStreamHandler>> 
handlers;
     static {
         handlers = new HashMap<>();
         handlers.put("data", DataURLStreamHandler.class);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/URL.java 
b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/URL.java
similarity index 88%
rename from 
pulsar-client/src/main/java/org/apache/pulsar/client/api/url/URL.java
rename to pulsar-common/src/main/java/org/apache/pulsar/client/api/url/URL.java
index 4d8c367..e5246b7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/url/URL.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/api/url/URL.java
@@ -26,8 +26,8 @@ import java.net.URLConnection;
 import java.net.URLStreamHandlerFactory;
 
 public class URL {
-    private static URLStreamHandlerFactory urlStreamHandlerFactory = new 
PulsarURLStreamHandlerFactory();
-    private java.net.URL url;
+    private static final URLStreamHandlerFactory urlStreamHandlerFactory = new 
PulsarURLStreamHandlerFactory();
+    private final java.net.URL url;
 
     public URL(String spec)
             throws MalformedURLException, URISyntaxException, 
InstantiationException, IllegalAccessException {
@@ -47,7 +47,7 @@ public class URL {
         return this.url.getContent();
     }
 
-    public Object getContent(Class[] classes) throws IOException {
+    public Object getContent(Class<?>[] classes) throws IOException {
         return this.url.getContent(classes);
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RelativeTimeUtil.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RelativeTimeUtil.java
new file mode 100644
index 0000000..3034960
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RelativeTimeUtil.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import java.util.concurrent.TimeUnit;
+
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
+public class RelativeTimeUtil {
+    public static long parseRelativeTimeInSeconds(String relativeTime) {
+        if (relativeTime.isEmpty()) {
+            throw new IllegalArgumentException("exipiry time cannot be empty");
+        }
+
+        int lastIndex=  relativeTime.length() - 1;
+        char lastChar = relativeTime.charAt(lastIndex);
+        final char timeUnit;
+
+        if (!Character.isAlphabetic(lastChar)) {
+            // No unit specified, assume seconds
+            timeUnit = 's';
+            lastIndex = relativeTime.length();
+        } else {
+            timeUnit = Character.toLowerCase(lastChar);
+        }
+
+        long duration = Long.parseLong(relativeTime.substring(0, lastIndex));
+
+        switch (timeUnit) {
+        case 's':
+            return duration;
+        case 'm':
+            return TimeUnit.MINUTES.toSeconds(duration);
+        case 'h':
+            return TimeUnit.HOURS.toSeconds(duration);
+        case 'd':
+            return TimeUnit.DAYS.toSeconds(duration);
+        case 'w':
+            return 7 * TimeUnit.DAYS.toSeconds(duration);
+        // No unit for months
+        case 'y':
+            return 365 * TimeUnit.DAYS.toSeconds(duration);
+        default:
+            throw new IllegalArgumentException("Invalid time unit '" + 
lastChar + "'");
+        }
+    }
+}
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RelativeTimeUtilTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RelativeTimeUtilTest.java
new file mode 100644
index 0000000..bc44ac4
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RelativeTimeUtilTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import java.util.concurrent.TimeUnit;
+
+import org.testng.annotations.Test;
+
+public class RelativeTimeUtilTest {
+    @Test
+    public void testParseRelativeTime() {
+        assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("-1"), -1);
+        assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("7"), 7);
+        assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("3s"), 3);
+        assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("3S"), 3);
+        assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("5m"), 
TimeUnit.MINUTES.toSeconds(5));
+        assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("5M"), 
TimeUnit.MINUTES.toSeconds(5));
+        assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("7h"), 
TimeUnit.HOURS.toSeconds(7));
+        assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("7H"), 
TimeUnit.HOURS.toSeconds(7));
+        assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("9d"), 
TimeUnit.DAYS.toSeconds(9));
+        assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("9D"), 
TimeUnit.DAYS.toSeconds(9));
+        assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("3w"), 7 * 
TimeUnit.DAYS.toSeconds(3));
+        assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("11y"), 365 * 
TimeUnit.DAYS.toSeconds(11));
+        assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("11Y"), 365 * 
TimeUnit.DAYS.toSeconds(11));
+
+        // Negative interval
+        assertEquals(RelativeTimeUtil.parseRelativeTimeInSeconds("-5m"), 
-TimeUnit.MINUTES.toSeconds(5));
+
+        try {
+            RelativeTimeUtil.parseRelativeTimeInSeconds("");
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // expected
+        }
+
+        try {
+            // Invalid time unit specified
+            RelativeTimeUtil.parseRelativeTimeInSeconds("1234x");
+            fail("should have failed");
+        } catch (IllegalArgumentException e) {
+            // expected
+        }
+    }
+}
diff --git a/site2/docs/reference-configuration.md 
b/site2/docs/reference-configuration.md
index 0d23589..2fec49e 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -108,7 +108,7 @@ Pulsar brokers are responsible for handling incoming 
messages from producers, di
 |enableNonPersistentTopics| Whether non-persistent topics are enabled on the 
broker |true|
 |functionsWorkerEnabled|  Whether the Pulsar Functions worker service is 
enabled in the broker  |false|
 |zookeeperServers|  Zookeeper quorum connection string  ||
-|globalZookeeperServers|  Global Zookeeper quorum connection string || 
+|globalZookeeperServers|  Global Zookeeper quorum connection string ||
 |brokerServicePort| Broker data port  |6650|
 |brokerServicePortTls|  Broker data port for TLS  |6651|
 |webServicePort|  Port to use to server HTTP request  |8080|
@@ -142,6 +142,8 @@ Pulsar brokers are responsible for handling incoming 
messages from producers, di
 |tlsAllowInsecureConnection|  Accept untrusted TLS certificate from client  
|false|
 |tlsProtocols|Specify the tls protocols the broker will use to negotiate 
during TLS Handshake. Multiple values can be specified, separated by commas. 
Example:- ```TLSv1.2```, ```TLSv1.1```, ```TLSv1``` ||
 |tlsCiphers|Specify the tls cipher the broker will use to negotiate during TLS 
Handshake. Multiple values can be specified, separated by commas. Example:- 
```TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256```||
+|tokenSecretKey| Configure the secret key to be used to validate auth tokens. 
The key can be specified like: `tokenSecretKey=data:base64,xxxxxxxxx` or 
`tokenSecretKey=file:///my/secret.key`||
+|tokenPublicKey| Configure the public key to be used to validate auth tokens. 
The key can be specified like: `tokenPublicKey=data:base64,xxxxxxxxx` or 
`tokenPublicKey=file:///my/secret.key`||
 |maxUnackedMessagesPerConsumer| Max number of unacknowledged messages allowed 
to receive messages by a consumer on a shared subscription. Broker will stop 
sending messages to consumer once, this limit reaches until consumer starts 
acknowledging messages back. Using a value of 0, is disabling unackeMessage 
limit check and consumer can receive messages without any restriction  |50000|
 |maxUnackedMessagesPerSubscription| Max number of unacknowledged messages 
allowed per shared subscription. Broker will stop dispatching messages to all 
consumers of the subscription once this limit reaches until consumer starts 
acknowledging messages back and unack count reaches to limit/2. Using a value 
of 0, is disabling unackedMessage-limit check and dispatcher can dispatch 
messages without any restriction  |200000|
 |maxConcurrentLookupRequest|  Max number of concurrent lookup request broker 
allows to throttle heavy incoming lookup traffic |50000|
@@ -386,7 +388,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) 
CLI tool can be used
 |zooKeeperSessionTimeoutMillis|   |30000|
 |serviceUrl|||
 |serviceUrlTls|||
-|brokerServiceUrl||| 
+|brokerServiceUrl|||
 |brokerServiceUrlTls|||
 |webServicePort||8080|
 |webServicePortTls||8443|
@@ -397,7 +399,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) 
CLI tool can be used
 |authorizationEnabled||false|
 |superUserRoles |||
 |brokerClientAuthenticationPlugin|||
-|brokerClientAuthenticationParameters||| 
+|brokerClientAuthenticationParameters|||
 |tlsEnabled||false|
 |tlsAllowInsecureConnection||false|
 |tlsCertificateFilePath|||
@@ -405,7 +407,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) 
CLI tool can be used
 |tlsTrustCertsFilePath|||
 
 
-## Pulsar proxy 
+## Pulsar proxy
 
 The [Pulsar proxy](concepts-architecture-overview.md#pulsar-proxy) can be 
configured in the `conf/proxy.conf` file.
 
@@ -438,6 +440,8 @@ The [Pulsar 
proxy](concepts-architecture-overview.md#pulsar-proxy) can be config
 |tlsRequireTrustedClientCertOnConnect|  Whether client certificates are 
required for TLS. Connections are rejected if the client certificate isn’t 
trusted. |false|
 |tlsProtocols|Specify the tls protocols the broker will use to negotiate 
during TLS Handshake. Multiple values can be specified, separated by commas. 
Example:- ```TLSv1.2```, ```TLSv1.1```, ```TLSv1``` ||
 |tlsCiphers|Specify the tls cipher the broker will use to negotiate during TLS 
Handshake. Multiple values can be specified, separated by commas. Example:- 
```TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256```||
+|tokenSecretKey| Configure the secret key to be used to validate auth tokens. 
The key can be specified like: `tokenSecretKey=data:base64,xxxxxxxxx` or 
`tokenSecretKey=file:///my/secret.key`||
+|tokenPublicKey| Configure the public key to be used to validate auth tokens. 
The key can be specified like: `tokenPublicKey=data:base64,xxxxxxxxx` or 
`tokenPublicKey=file:///my/secret.key`||
 
 ## ZooKeeper
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
new file mode 100644
index 0000000..5606811
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.auth.token;
+
+import static java.util.stream.Collectors.joining;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.stream.Stream;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.containers.ProxyContainer;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.containers.ZKContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.testcontainers.containers.Network;
+import org.testng.ITest;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+@Slf4j
+public abstract class PulsarTokenAuthenticationBaseSuite extends 
PulsarClusterTestBase implements ITest {
+
+    protected String superUserAuthToken;
+    protected String proxyAuthToken;
+    protected String clientAuthToken;
+
+    protected abstract void createKeysAndTokens(PulsarContainer<?> container) 
throws Exception;
+    protected abstract void configureBroker(BrokerContainer brokerContainer) 
throws Exception;
+    protected abstract void configureProxy(ProxyContainer proxyContainer) 
throws Exception;
+
+    protected static final String SUPER_USER_ROLE = "super-user";
+    protected static final String PROXY_ROLE = "proxy";
+    protected static final String REGULAR_USER_ROLE = "client";
+
+    @BeforeSuite
+    @Override
+    public void setupCluster() throws Exception {
+        // Before starting the cluster, generate the secret key and the token
+        // Use Zk container to have 1 container available before starting the 
cluster
+        try (ZKContainer<?> zkContainer = new ZKContainer<>("cli-setup")) {
+            zkContainer
+                .withNetwork(Network.newNetwork())
+                .withNetworkAliases(ZKContainer.NAME)
+                .withEnv("zkServers", ZKContainer.NAME);
+            zkContainer.start();
+
+            createKeysAndTokens(zkContainer);
+        }
+
+        final String clusterName = Stream.of(this.getClass().getSimpleName(), 
randomName(5))
+                .filter(s -> s != null && !s.isEmpty())
+                .collect(joining("-"));
+
+        PulsarClusterSpec spec = PulsarClusterSpec.builder()
+                .numBookies(2)
+                .numBrokers(1)
+                .numProxies(1)
+                .clusterName(clusterName)
+                .build();
+
+        log.info("Setting up cluster {} with token authentication  and {} 
bookies, {} brokers",
+                spec.clusterName(), spec.numBookies(), spec.numBrokers());
+
+        pulsarCluster = PulsarCluster.forSpec(spec);
+
+        for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
+            configureBroker(brokerContainer);
+            brokerContainer.withEnv("authenticationEnabled", "true");
+            brokerContainer.withEnv("authenticationProviders",
+                    
"org.apache.pulsar.broker.authentication.AuthenticationProviderToken");
+            brokerContainer.withEnv("authorizationEnabled", "true");
+            brokerContainer.withEnv("superUserRoles", SUPER_USER_ROLE + "," + 
PROXY_ROLE);
+        }
+
+        ProxyContainer proxyContainer = pulsarCluster.getProxy();
+        configureProxy(proxyContainer);
+        proxyContainer.withEnv("authenticationEnabled", "true");
+        proxyContainer.withEnv("authenticationProviders",
+                
"org.apache.pulsar.broker.authentication.AuthenticationProviderToken");
+        proxyContainer.withEnv("authorizationEnabled", "true");
+        proxyContainer.withEnv("brokerClientAuthenticationPlugin", 
AuthenticationToken.class.getName());
+        proxyContainer.withEnv("brokerClientAuthenticationParameters", 
"token:" + proxyAuthToken);
+
+        pulsarCluster.start();
+
+        log.info("Cluster {} is setup", spec.clusterName());
+    }
+
+    @AfterSuite
+    @Override
+    public void tearDownCluster() {
+        super.tearDownCluster();
+    }
+
+    @Override
+    public String getTestName() {
+        return "token-auth-test-suite";
+    }
+
+    @Test
+    public void testPublishWithTokenAuth() throws Exception {
+        final String tenant = "token-test-tenant" + randomName(4);
+        final String namespace = tenant + "/ns-1";
+        final String topic = "persistent://" + namespace + "/topic-1";
+
+        @Cleanup
+        PulsarAdmin admin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
+                
.authentication(AuthenticationFactory.token(superUserAuthToken))
+                .build();
+
+        try {
+        admin.tenants().createTenant(tenant,
+                new TenantInfo(Collections.singleton(REGULAR_USER_ROLE),
+                        
Collections.singleton(pulsarCluster.getClusterName())));
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        admin.namespaces().createNamespace(namespace, 
Collections.singleton(pulsarCluster.getClusterName()));
+        admin.namespaces().grantPermissionOnNamespace(namespace, 
REGULAR_USER_ROLE, EnumSet.allOf(AuthAction.class));
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                .authentication(AuthenticationFactory.token(clientAuthToken))
+                .build();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        @Cleanup
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .subscribe();
+
+        final int numMessages = 10;
+
+        for (int i = 0; i < numMessages; i++) {
+            producer.send("hello-" + i);
+        }
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<String> msg = consumer.receive();
+            assertEquals(msg.getValue(), "hello-" + i);
+
+            consumer.acknowledge(msg);
+        }
+
+        // Test client with no auth and expect it to fail
+        @Cleanup
+        PulsarClient clientNoAuth = PulsarClient.builder()
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                .build();
+
+        try {
+            clientNoAuth.newProducer(Schema.STRING).topic(topic)
+                    .create();
+            fail("Should have failed to create producer");
+        } catch (PulsarClientException e) {
+            // Expected
+        }
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithPublicPrivateKeys.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithPublicPrivateKeys.java
new file mode 100644
index 0000000..7f1a03a
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithPublicPrivateKeys.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.auth.token;
+
+import com.google.common.io.Files;
+
+import java.io.File;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.containers.ProxyContainer;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.utils.DockerUtils;
+
+@Slf4j
+public class TokenAuthWithPublicPrivateKeys extends 
PulsarTokenAuthenticationBaseSuite {
+
+    private static final String PRIVATE_KEY_PATH_INSIDE_CONTAINER = 
"/tmp/private.key";
+    private static final String PUBLIC_KEY_PATH_INSIDE_CONTAINER = 
"/tmp/public.key";
+
+    private File publicKeyFile;
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    protected void createKeysAndTokens(PulsarContainer container) throws 
Exception {
+        container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", 
"create-key-pair",
+                        "--output-private-key", 
PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--output-public-key", 
PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+
+        byte[] publicKeyBytes = DockerUtils
+                .runCommandWithRawOutput(container.getDockerClient(), 
container.getContainerId(),
+                        "/bin/cat", PUBLIC_KEY_PATH_INSIDE_CONTAINER)
+                .getStdout();
+
+        publicKeyFile = File.createTempFile("public-", ".key", new 
File("/tmp"));
+        Files.write(publicKeyBytes, publicKeyFile);
+
+        clientAuthToken = container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", 
"create",
+                        "--private-key", "file://" + 
PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--subject", REGULAR_USER_ROLE)
+                .getStdout().trim();
+        log.info("Created client token: {}", clientAuthToken);
+
+        superUserAuthToken = container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", 
"create",
+                        "--private-key", "file://" + 
PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--subject", SUPER_USER_ROLE)
+                .getStdout().trim();
+        log.info("Created super-user token: {}", superUserAuthToken);
+
+        proxyAuthToken = container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", 
"create",
+                        "--private-key", "file://" + 
PRIVATE_KEY_PATH_INSIDE_CONTAINER,
+                        "--subject", PROXY_ROLE)
+                .getStdout().trim();
+        log.info("Created proxy token: {}", proxyAuthToken);
+    }
+
+    @Override
+    protected void configureBroker(BrokerContainer brokerContainer) throws 
Exception {
+        brokerContainer.withFileSystemBind(publicKeyFile.toString(), 
PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+        brokerContainer.withEnv("tokenPublicKey", "file://" + 
PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+    }
+
+    @Override
+    protected void configureProxy(ProxyContainer proxyContainer) throws 
Exception {
+        proxyContainer.withFileSystemBind(publicKeyFile.toString(), 
PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+        proxyContainer.withEnv("tokenPublicKey", "file://" + 
PUBLIC_KEY_PATH_INSIDE_CONTAINER);
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithSymmetricKeys.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithSymmetricKeys.java
new file mode 100644
index 0000000..96efa6c
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithSymmetricKeys.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.auth.token;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.containers.ProxyContainer;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+
+@Slf4j
+public class TokenAuthWithSymmetricKeys extends 
PulsarTokenAuthenticationBaseSuite {
+
+    private String secretKey;
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    protected void createKeysAndTokens(PulsarContainer container) throws 
Exception {
+        secretKey = container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", 
"create-secret-key", "--base64")
+                .getStdout();
+        log.info("Created secret key: {}", secretKey);
+
+        clientAuthToken = container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", 
"create",
+                        "--secret-key", "data:base64," + secretKey,
+                        "--subject", REGULAR_USER_ROLE)
+                .getStdout().trim();
+        log.info("Created client token: {}", clientAuthToken);
+
+        superUserAuthToken = container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", 
"create",
+                        "--secret-key", "data:base64," + secretKey,
+                        "--subject", SUPER_USER_ROLE)
+                .getStdout().trim();
+        log.info("Created super-user token: {}", superUserAuthToken);
+
+        proxyAuthToken = container
+                .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", 
"create",
+                        "--secret-key", "data:base64," + secretKey,
+                        "--subject", PROXY_ROLE)
+                .getStdout().trim();
+        log.info("Created proxy token: {}", proxyAuthToken);
+    }
+
+    @Override
+    protected void configureBroker(BrokerContainer brokerContainer) throws 
Exception {
+        brokerContainer.withEnv("tokenSecretKey", "data:base64," + secretKey);
+    }
+
+    @Override
+    protected void configureProxy(ProxyContainer proxyContainer) throws 
Exception {
+        proxyContainer.withEnv("tokenSecretKey", "data:base64," + secretKey);
+    }
+
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResultBytes.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResultBytes.java
new file mode 100644
index 0000000..0f29f18
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/docker/ContainerExecResultBytes.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.docker;
+
+import lombok.Data;
+
+/**
+ * Represents the result of executing a command.
+ */
+@Data(staticConstructor = "of")
+public class ContainerExecResultBytes {
+
+    private final int exitCode;
+    private final byte[] stdout;
+    private final byte[] stderr;
+
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index c04b669..803bde7 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -452,6 +452,10 @@ public class PulsarCluster {
         return brokerContainers.values();
     }
 
+    public ProxyContainer getProxy() {
+        return proxyContainer;
+    }
+
     public Collection<BKContainer> getBookies() {
         return bookieContainers.values();
     }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
index 8fdd968..91fe8ff 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
@@ -24,16 +24,19 @@ import com.github.dockerjava.api.DockerClient;
 import com.github.dockerjava.api.async.ResultCallback;
 import com.github.dockerjava.api.command.InspectContainerResponse;
 import com.github.dockerjava.api.command.InspectExecResponse;
-import com.github.dockerjava.api.model.Frame;
 import com.github.dockerjava.api.model.ContainerNetwork;
-
+import com.github.dockerjava.api.model.Frame;
 import com.github.dockerjava.api.model.StreamType;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 import java.io.Closeable;
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.OutputStream;
-import java.io.InputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Optional;
@@ -42,11 +45,11 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import java.util.zip.GZIPOutputStream;
 
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResultBytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -246,6 +249,82 @@ public class DockerUtils {
         return result;
     }
 
+    public static ContainerExecResultBytes 
runCommandWithRawOutput(DockerClient docker,
+            String containerId,
+            String... cmd)
+            throws ContainerExecException {
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
+        String execid = docker.execCreateCmd(containerId)
+                .withCmd(cmd)
+                .withAttachStderr(true)
+                .withAttachStdout(true)
+                .exec()
+                .getId();
+        String cmdString = Arrays.stream(cmd).collect(Collectors.joining(" "));
+        ByteBuf stdout = Unpooled.buffer();
+        ByteBuf stderr = Unpooled.buffer();
+        docker.execStartCmd(execid).withDetach(false)
+                .exec(new ResultCallback<Frame>() {
+                    @Override
+                    public void close() {
+                    }
+
+                    @Override
+                    public void onStart(Closeable closeable) {
+                        LOG.info("DOCKER.exec({}:{}): Executing...", 
containerId, cmdString);
+                    }
+
+                    @Override
+                    public void onNext(Frame object) {
+                        if (StreamType.STDOUT == object.getStreamType()) {
+                            stdout.writeBytes(object.getPayload());
+                        } else if (StreamType.STDERR == 
object.getStreamType()) {
+                            stderr.writeBytes(object.getPayload());
+                        }
+                    }
+
+                    @Override
+                    public void onError(Throwable throwable) {
+                        future.completeExceptionally(throwable);
+                    }
+
+                    @Override
+                    public void onComplete() {
+                        LOG.info("DOCKER.exec({}:{}): Done", containerId, 
cmdString);
+                        future.complete(true);
+                    }
+                });
+        future.join();
+
+        InspectExecResponse resp = docker.inspectExecCmd(execid).exec();
+        while (resp.isRunning()) {
+            try {
+                Thread.sleep(200);
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(ie);
+            }
+            resp = docker.inspectExecCmd(execid).exec();
+        }
+        int retCode = resp.getExitCode();
+
+        byte[] stdoutBytes = new byte[stdout.readableBytes()];
+        stdout.readBytes(stdoutBytes);
+        byte[] stderrBytes = new byte[stderr.readableBytes()];
+        stderr.readBytes(stderrBytes);
+
+        ContainerExecResultBytes result = ContainerExecResultBytes.of(
+                retCode,
+                stdoutBytes,
+                stderrBytes);
+        LOG.info("DOCKER.exec({}:{}): completed with {}", containerId, 
cmdString, retCode);
+
+        if (retCode != 0) {
+            throw new ContainerExecException(cmdString, containerId, null);
+        }
+        return result;
+    }
+
     public static Optional<String> getContainerCluster(DockerClient docker, 
String containerId) {
         return Optional.ofNullable(docker.inspectContainerCmd(containerId)
                                    
.exec().getConfig().getLabels().get("cluster"));

Reply via email to