This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d6709ae  [Issue #6711]: add audience verify in 
AuthenticationProviderToken (#6716)
d6709ae is described below

commit d6709ae987c37ca1bde3305d0bd2662d1e7a8c38
Author: Jia Zhai <[email protected]>
AuthorDate: Thu Apr 16 14:50:17 2020 +0800

    [Issue #6711]: add audience verify in AuthenticationProviderToken (#6716)
    
    Fixes #6711
    
    ### Motivation
    User like to be able to configure the JWT authentication provider to verify 
the audience on incoming tokens.  I believe this will improve security because 
it would prevent a spoofer from reusing a token that was intended for another 
purpose (yet signed by the same issuer).  [RFC 6749 section 
4.1.3](https://tools.ietf.org/html/rfc7519#section-4.1.3) has some guidance on 
this.  In my scenario, the token is an OAuth 2.0 token, and OAuth 2.0 makes 
extensive use of the audience claim ([ref [...]
    
    1. a configurable audience claim name (e.g. `aud`).
    2. if audience isn't configured, do not validate the audience (for 
back-compatibility).
    3. if audience is configured, validate that the value is present in the 
token.
    
    ### Modifications
    - Add the logic in AuthenticationProviderToken.
    - Add related tests.
    
    ### Verifying this change
    - Ut passed
---
 conf/broker.conf                                   |  11 +-
 conf/proxy.conf                                    |   9 +-
 conf/standalone.conf                               |  15 ++-
 distribution/server/src/assemble/LICENSE.bin.txt   |   6 +-
 pom.xml                                            |   2 +-
 .../AuthenticationProviderToken.java               |  64 +++++++++-
 .../AuthenticationProviderTokenTest.java           | 138 +++++++++++++++++++++
 site2/docs/reference-configuration.md              |   2 +
 8 files changed, 235 insertions(+), 12 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index fa2e726..69cc55f 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -120,7 +120,7 @@ brokerDeleteInactiveTopicsMode=delete_when_no_subscriptions
 # Topics that are inactive for longer than this value will be deleted
 brokerDeleteInactiveTopicsMaxInactiveDurationSeconds=
 
-# Max pending publish requests per connection to avoid keeping large number of 
pending 
+# Max pending publish requests per connection to avoid keeping large number of 
pending
 # requests in memory. Default: 1000
 maxPendingPublishdRequestsPerConnection=1000
 
@@ -460,6 +460,13 @@ tokenPublicKey=
 # The token "claim" that will be interpreted as the authentication "role" or 
"principal" by AuthenticationProviderToken (defaults to "sub" if blank)
 tokenAuthClaim=
 
+# The token audience "claim" name, e.g. "aud", that will be used to get the 
audience from token.
+# If not set, audience will not be verified.
+tokenAudienceClaim=
+
+# The token audience stands for this broker. The field `tokenAudienceClaim` of 
a valid token, need contains this.
+tokenAudience=
+
 ### --- SASL Authentication Provider --- ###
 
 # This is a regexp, which limits the range of possible ids which can connect 
to the Broker using SASL.
@@ -758,7 +765,7 @@ replicationProducerQueueSize=1000
 # Replicator prefix used for replicator producer name and cursor name
 replicatorPrefix=pulsar.repl
 
-# Duration to check replication policy to avoid replicator inconsistency 
+# Duration to check replication policy to avoid replicator inconsistency
 # due to missing ZooKeeper watch (disable with value 0)
 replicatioPolicyCheckDurationSeconds=600
 
diff --git a/conf/proxy.conf b/conf/proxy.conf
index b1eb305..40c95b1 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -106,7 +106,7 @@ brokerClientTrustCertsFilePath=
 # Whether TLS is enabled when communicating with Pulsar brokers
 tlsEnabledWithBroker=false
 
-# Tls cert refresh duration in seconds (set 0 to check on every new 
connection) 
+# Tls cert refresh duration in seconds (set 0 to check on every new connection)
 tlsCertRefreshCheckDurationSec=300
 
 ##### --- Rate Limiting --- #####
@@ -192,6 +192,13 @@ tokenPublicKey=
 # The token "claim" that will be interpreted as the authentication "role" or 
"principal" by AuthenticationProviderToken (defaults to "sub" if blank)
 tokenAuthClaim=
 
+# The token audience "claim" name, e.g. "aud", that will be used to get the 
audience from token.
+# If not set, audience will not be verified.
+tokenAudienceClaim=
+
+# The token audience stands for this broker. The field `tokenAudienceClaim` of 
a valid token, need contains this.
+tokenAudience=
+
 ### --- Deprecated config variables --- ###
 
 # Deprecated. Use configurationStoreServers
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 1e60acc..b2b0e83 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -81,7 +81,7 @@ brokerDeleteInactiveTopicsEnabled=true
 # How often to check for inactive topics
 brokerDeleteInactiveTopicsFrequencySeconds=60
 
-# Max pending publish requests per connection to avoid keeping large number of 
pending 
+# Max pending publish requests per connection to avoid keeping large number of 
pending
 # requests in memory. Default: 1000
 maxPendingPublishdRequestsPerConnection=1000
 
@@ -179,8 +179,8 @@ dispatchThrottlingRatePerTopicInMsg=0
 # default message-byte dispatch-throttling
 dispatchThrottlingRatePerTopicInByte=0
 
-# Dispatch rate-limiting relative to publish rate. 
-# (Enabling flag will make broker to dynamically update dispatch-rate 
relatively to publish-rate: 
+# Dispatch rate-limiting relative to publish rate.
+# (Enabling flag will make broker to dynamically update dispatch-rate 
relatively to publish-rate:
 # throttle-dispatch-rate = (publish-rate + configured dispatch-rate).
 dispatchThrottlingRateRelativeToPublishRate=false
 
@@ -265,6 +265,13 @@ anonymousUserRole=
 # The token "claim" that will be interpreted as the authentication "role" or 
"principal" by AuthenticationProviderToken (defaults to "sub" if blank)
 tokenAuthClaim=
 
+# The token audience "claim" name, e.g. "aud", that will be used to get the 
audience from token.
+# If not set, audience will not be verified.
+tokenAudienceClaim=
+
+# The token audience stands for this broker. The field `tokenAudienceClaim` of 
a valid token, need contains this.
+tokenAudience=
+
 ### --- BookKeeper Client --- ###
 
 # Authentication plugin to use when connecting to bookies
@@ -505,7 +512,7 @@ replicationConnectionsPerBroker=16
 # Replicator producer queue size
 replicationProducerQueueSize=1000
 
-# Duration to check replication policy to avoid replicator inconsistency 
+# Duration to check replication policy to avoid replicator inconsistency
 # due to missing ZooKeeper watch (disable with value 0)
 replicatioPolicyCheckDurationSeconds=600
 
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index 827864d..fc90414 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -485,9 +485,9 @@ The Apache Software License, Version 2.0
   * Prometheus
     - io.prometheus-simpleclient_httpserver-0.5.0.jar
   * Java JSON WebTokens
-    - io.jsonwebtoken-jjwt-api-0.10.5.jar
-    - io.jsonwebtoken-jjwt-impl-0.10.5.jar
-    - io.jsonwebtoken-jjwt-jackson-0.10.5.jar
+    - io.jsonwebtoken-jjwt-api-0.11.1.jar
+    - io.jsonwebtoken-jjwt-impl-0.11.1.jar
+    - io.jsonwebtoken-jjwt-jackson-0.11.1.jar
   * JavaX Injection
     - javax.inject-javax.inject-1.jar
   * JCTools - Java Concurrency Tools for the JVM
diff --git a/pom.xml b/pom.xml
index efaa164..588b7e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -200,7 +200,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <flink.version>1.6.0</flink.version>
     <scala.binary.version>2.11</scala.binary.version>
     <debezium.version>1.0.0.Final</debezium.version>
-    <jsonwebtoken.version>0.10.5</jsonwebtoken.version>
+    <jsonwebtoken.version>0.11.1</jsonwebtoken.version>
     <opencensus.version>0.18.0</opencensus.version>
     <zstd.version>1.3.7-3</zstd.version>
     <snappy.version>1.1.1.3</snappy.version>
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
index 87a15a2..da21a8e 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.net.SocketAddress;
 import java.security.Key;
 
+import java.util.List;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 
@@ -56,11 +57,19 @@ public class AuthenticationProviderToken implements 
AuthenticationProvider {
     // When using public key's, the algorithm of the key
     final static String CONF_TOKEN_PUBLIC_ALG = "tokenPublicAlg";
 
+    // The token audience "claim" name, e.g. "aud", that will be used to get 
the audience from token.
+    final static String CONF_TOKEN_AUDIENCE_CLAIM = "tokenAudienceClaim";
+
+    // The token audience stands for this broker. The field 
`tokenAudienceClaim` of a valid token, need contains this.
+    final static String CONF_TOKEN_AUDIENCE = "tokenAudience";
+
     final static String TOKEN = "token";
 
     private Key validationKey;
     private String roleClaim;
     private SignatureAlgorithm publicKeyAlg;
+    private String audienceClaim;
+    private String audience;
 
     @Override
     public void close() throws IOException {
@@ -73,6 +82,13 @@ public class AuthenticationProviderToken implements 
AuthenticationProvider {
         this.publicKeyAlg = getPublicKeyAlgType(config);
         this.validationKey = getValidationKey(config);
         this.roleClaim = getTokenRoleClaim(config);
+        this.audienceClaim = getTokenAudienceClaim(config);
+        this.audience = getTokenAudience(config);
+
+        if (audienceClaim != null && audience == null ) {
+            throw new IllegalArgumentException("Token Audience Claim [" + 
audienceClaim
+                                               + "] configured, but Audience 
stands for this broker not.");
+        }
     }
 
     @Override
@@ -126,9 +142,35 @@ public class AuthenticationProviderToken implements 
AuthenticationProvider {
     @SuppressWarnings("unchecked")
     private Jwt<?, Claims> authenticateToken(final String token) throws 
AuthenticationException {
         try {
-            return Jwts.parser()
+            Jwt<?, Claims> jwt = Jwts.parser()
                     .setSigningKey(validationKey)
                     .parse(token);
+
+            if (audienceClaim != null) {
+                Object object = jwt.getBody().get(audienceClaim);
+                if (object == null) {
+                    throw new JwtException("Found null Audience in token, for 
claimed field: " + audienceClaim);
+                }
+
+                if (object instanceof List) {
+                    List<String> audiences = (List<String>) object;
+                    // audience not contains this broker, throw exception.
+                    if (!audiences.stream().anyMatch(audienceInToken -> 
audienceInToken.equals(audience))) {
+                        throw new AuthenticationException("Audiences in token: 
[" + String.join(", ", audiences)
+                                                          + "] not contains 
this broker: " + audience);
+                    }
+                } else if (object instanceof String) {
+                    if (!object.equals(audience)) {
+                        throw new AuthenticationException("Audiences in token: 
[" + object
+                                                          + "] not contains 
this broker: " + audience);
+                    }
+                } else {
+                    // should not reach here.
+                    throw new AuthenticationException("Audiences in token is 
not in expected format: " + object);
+                }
+            }
+
+            return jwt;
         } catch (JwtException e) {
             throw new AuthenticationException("Failed to authentication token: 
" + e.getMessage());
         }
@@ -180,6 +222,26 @@ public class AuthenticationProviderToken implements 
AuthenticationProvider {
         }
     }
 
+    // get Token Audience Claim from configuration, if not configured return 
null.
+    private String getTokenAudienceClaim(ServiceConfiguration conf) throws 
IllegalArgumentException {
+        if (conf.getProperty(CONF_TOKEN_AUDIENCE_CLAIM) != null
+            && StringUtils.isNotBlank((String) 
conf.getProperty(CONF_TOKEN_AUDIENCE_CLAIM))) {
+            return (String) conf.getProperty(CONF_TOKEN_AUDIENCE_CLAIM);
+        } else {
+            return null;
+        }
+    }
+
+    // get Token Audience that stands for this broker from configuration, if 
not configured return null.
+    private String getTokenAudience(ServiceConfiguration conf) throws 
IllegalArgumentException {
+        if (conf.getProperty(CONF_TOKEN_AUDIENCE) != null
+            && StringUtils.isNotBlank((String) 
conf.getProperty(CONF_TOKEN_AUDIENCE))) {
+            return (String) conf.getProperty(CONF_TOKEN_AUDIENCE);
+        } else {
+            return null;
+        }
+    }
+
     private static final class TokenAuthenticationState implements 
AuthenticationState {
         private final AuthenticationProviderToken provider;
         private AuthenticationDataSource authenticationDataSource;
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
index 3cc040d..60fa5f6 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
@@ -25,12 +25,16 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Lists;
 import io.jsonwebtoken.Claims;
 import io.jsonwebtoken.Jwt;
+import io.jsonwebtoken.JwtBuilder;
 import io.jsonwebtoken.Jwts;
 import io.jsonwebtoken.SignatureAlgorithm;
 import io.jsonwebtoken.io.Decoders;
 import io.jsonwebtoken.security.Keys;
+import java.security.Key;
+import java.util.List;
 import lombok.Cleanup;
 
 import java.io.File;
@@ -632,4 +636,138 @@ public class AuthenticationProviderTokenTest {
         AuthData brokerData = authState.refreshAuthentication();
         assertNull(brokerData);
     }
+
+    // tests for Token Audience
+    @Test
+    public void testRightTokenAudienceClaim() throws Exception {
+        String brokerAudience = "testBroker_" + System.currentTimeMillis();
+        Properties properties = new Properties();
+        
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE_CLAIM, 
"aud");
+        
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE, 
brokerAudience);
+
+        testTokenAudienceWithDifferentConfig(properties, brokerAudience);
+    }
+
+    @Test(expectedExceptions = AuthenticationException.class)
+    public void testWrongTokenAudience() throws Exception {
+        String brokerAudience = "testBroker_" + System.currentTimeMillis();
+
+        Properties properties = new Properties();
+        
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE_CLAIM, 
"aud");
+        // set wrong audience in token, should throw exception.
+        
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE, 
brokerAudience + "-wrong");
+        testTokenAudienceWithDifferentConfig(properties, brokerAudience);
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public void testNoBrokerTokenAudience() throws Exception {
+        String brokerAudience = "testBroker_" + System.currentTimeMillis();
+
+        Properties properties = new Properties();
+        // Not set broker audience, should throw exception.
+        
//properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE, 
brokerAudience);
+        
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE_CLAIM, 
"aud");
+        testTokenAudienceWithDifferentConfig(properties, brokerAudience);
+    }
+
+    @Test
+    public void testSelfDefineTokenAudienceClaim() throws Exception {
+        String audienceClaim = "audience_claim_" + System.currentTimeMillis();
+        String brokerAudience = "testBroker_" + System.currentTimeMillis();
+
+        Properties properties = new Properties();
+        
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE, 
brokerAudience);
+        
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE_CLAIM, 
audienceClaim);
+        testTokenAudienceWithDifferentConfig(properties, audienceClaim, 
Lists.newArrayList(brokerAudience));
+    }
+
+    @Test(expectedExceptions = AuthenticationException.class)
+    public void testWrongSelfDefineTokenAudienceClaim() throws Exception {
+        String audienceClaim = "audience_claim_" + System.currentTimeMillis();
+        String brokerAudience = "testBroker_" + System.currentTimeMillis();
+
+        Properties properties = new Properties();
+        // Set wrong broker audience, should throw exception.
+        
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE, 
brokerAudience);
+        
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE_CLAIM, 
audienceClaim);
+        testTokenAudienceWithDifferentConfig(properties,
+                audienceClaim + "_wrong",
+                Lists.newArrayList(brokerAudience));
+    }
+
+    @Test
+    public void testMultiTokenAudience() throws Exception {
+        String audienceClaim = "audience_claim_" + System.currentTimeMillis();
+        String brokerAudience = "testBroker_" + System.currentTimeMillis();
+
+        List<String> audiences = Lists.newArrayList("AnotherBrokerAudience", 
brokerAudience);
+
+        Properties properties = new Properties();
+        
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE, 
brokerAudience);
+        
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE_CLAIM, 
audienceClaim);
+        testTokenAudienceWithDifferentConfig(properties, audienceClaim, 
audiences);
+    }
+
+    @Test(expectedExceptions = AuthenticationException.class)
+    public void testMultiTokenAudienceNotInclude() throws Exception {
+        String audienceClaim = "audience_claim_" + System.currentTimeMillis();
+        String brokerAudience = "testBroker_" + System.currentTimeMillis();
+
+        List<String> audiences = Lists.newArrayList("AnotherBrokerAudience", 
brokerAudience + "_wrong");
+
+        Properties properties = new Properties();
+        // Broker audience not included in token's audiences, should throw 
exception.
+        
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE, 
brokerAudience);
+        
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_AUDIENCE_CLAIM, 
audienceClaim);
+        testTokenAudienceWithDifferentConfig(properties, audienceClaim, 
audiences);
+    }
+
+    private static String createTokenWithAudience(Key signingKey, String 
audienceClaim, List<String> audience) {
+        JwtBuilder builder = Jwts.builder()
+                .setSubject(SUBJECT)
+                .signWith(signingKey);
+
+        builder.claim(audienceClaim, audience);
+        return builder.compact();
+    }
+
+    private static void testTokenAudienceWithDifferentConfig(Properties 
properties,
+                                                             String 
brokerAudience) throws Exception {
+        testTokenAudienceWithDifferentConfig(properties,
+                "aud",
+                Lists.newArrayList(brokerAudience));
+    }
+
+    private static void testTokenAudienceWithDifferentConfig(Properties 
properties,
+                                                             String 
audienceClaim, List<String> audiences) throws Exception {
+        @Cleanup
+        AuthenticationProviderToken provider = new 
AuthenticationProviderToken();
+        SecretKey secretKey = 
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+        File secretKeyFile = 
File.createTempFile("pulsar-test-secret-key-valid", ".key");
+        secretKeyFile.deleteOnExit();
+        Files.write(Paths.get(secretKeyFile.toString()), 
secretKey.getEncoded());
+
+        
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_SECRET_KEY, 
secretKeyFile.toString());
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setProperties(properties);
+        provider.initialize(conf);
+
+        String token = createTokenWithAudience(secretKey, audienceClaim, 
audiences);
+
+        // Pulsar protocol auth
+        String subject = provider.authenticate(new AuthenticationDataSource() {
+            @Override
+            public boolean hasDataFromCommand() {
+                return true;
+            }
+
+            @Override
+            public String getCommandData() {
+                return token;
+            }
+        });
+        assertEquals(subject, SUBJECT);
+        provider.close();
+    }
 }
diff --git a/site2/docs/reference-configuration.md 
b/site2/docs/reference-configuration.md
index c4ed57a..7c7d348 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -155,6 +155,8 @@ Pulsar brokers are responsible for handling incoming 
messages from producers, di
 |tokenPublicKey| Configure the public key to be used to validate auth tokens. 
The key can be specified like: `tokenPublicKey=data:base64,xxxxxxxxx` or 
`tokenPublicKey=file:///my/secret.key`||
 |tokenPublicAlg| Configure the algorithm to be used to validate auth tokens. 
This can be any of the asymettric algorithms supported by Java JWT 
(https://github.com/jwtk/jjwt#signature-algorithms-keys) |RS256|
 |tokenAuthClaim| Specify which of the token's claims will be used as the 
authentication "principal" or "role". The default "sub" claim will be used if 
this is left blank ||
+|tokenAudienceClaim| The token audience "claim" name, e.g. "aud", that will be 
used to get the audience from token. If not set, audience will not be verified. 
||
+|tokenAudience| The token audience stands for this broker. The field 
`tokenAudienceClaim` of a valid token, need contains this. ||
 |maxUnackedMessagesPerConsumer| Max number of unacknowledged messages allowed 
to receive messages by a consumer on a shared subscription. Broker will stop 
sending messages to consumer once, this limit reaches until consumer starts 
acknowledging messages back. Using a value of 0, is disabling unackeMessage 
limit check and consumer can receive messages without any restriction  |50000|
 |maxUnackedMessagesPerSubscription| Max number of unacknowledged messages 
allowed per shared subscription. Broker will stop dispatching messages to all 
consumers of the subscription once this limit reaches until consumer starts 
acknowledging messages back and unack count reaches to limit/2. Using a value 
of 0, is disabling unackedMessage-limit check and dispatcher can dispatch 
messages without any restriction  |200000|
 |subscriptionRedeliveryTrackerEnabled| Enable subscription message redelivery 
tracker |true|

Reply via email to