SENTRY-1030: Restrict Kafka Cluster authorizable to only have "kafka-cluster" 
as authorizable's name. (Ashish K Singh, reviewed by: Dapeng Sun and Hao Hao)

Change-Id: I0be60422a85ba783a825a71cd677820dbbc388fa


Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/734e1905
Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/734e1905
Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/734e1905

Branch: refs/heads/master
Commit: 734e190526aebfd1f199b705e16ebc6f40d31f32
Parents: 51f4e8d
Author: hahao <[email protected]>
Authored: Wed Mar 2 10:55:05 2016 -0800
Committer: hahao <[email protected]>
Committed: Mon Mar 21 23:16:53 2016 -0700

----------------------------------------------------------------------
 .../apache/sentry/core/model/kafka/Cluster.java   | 15 +++------------
 .../core/model/kafka/TestKafkaAuthorizable.java   |  6 +++---
 .../policy/kafka/KafkaModelAuthorizables.java     | 15 ++++++++++-----
 .../policy/kafka/TestKafkaModelAuthorizables.java | 18 ++++++++++++++++--
 .../policy/kafka/TestKafkaPrivilegeValidator.java |  8 ++++----
 ...estKafkaAuthorizationProviderGeneralCases.java |  2 +-
 6 files changed, 37 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/734e1905/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Cluster.java
----------------------------------------------------------------------
diff --git 
a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Cluster.java
 
b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Cluster.java
index bb30b1b..edf36c8 100644
--- 
a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Cluster.java
+++ 
b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/Cluster.java
@@ -20,16 +20,7 @@ package org.apache.sentry.core.model.kafka;
  * Represents Cluster authorizable in Kafka model.
  */
 public class Cluster implements KafkaAuthorizable {
-  private String name;
-
-  /**
-   * Create a Cluster authorizable for Kafka cluster of a given name.
-   *
-   * @param name Name of Kafka cluster.
-   */
-  public Cluster(String name) {
-    this.name = name;
-  }
+  public static final String NAME = "kafka-cluster";
 
   /**
    * Get type of Kafka's cluster authorizable.
@@ -48,7 +39,7 @@ public class Cluster implements KafkaAuthorizable {
    */
   @Override
   public String getName() {
-    return name;
+    return NAME;
   }
 
   /**
@@ -60,4 +51,4 @@ public class Cluster implements KafkaAuthorizable {
   public String getTypeName() {
     return getAuthzType().name();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/734e1905/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
----------------------------------------------------------------------
diff --git 
a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
 
b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
index 20d5e8e..81446a7 100644
--- 
a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
+++ 
b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java
@@ -37,8 +37,8 @@ public class TestKafkaAuthorizable {
     Host host = new Host(name);
     Assert.assertEquals(host.getName(), name);
 
-    Cluster cluster = new Cluster(name);
-    Assert.assertEquals(cluster.getName(), name);
+    Cluster cluster = new Cluster();
+    Assert.assertEquals(cluster.getName(), Cluster.NAME);
 
     Topic topic = new Topic(name);
     Assert.assertEquals(topic.getName(), name);
@@ -52,7 +52,7 @@ public class TestKafkaAuthorizable {
     Host host = new Host("host1");
     Assert.assertEquals(host.getAuthzType(), AuthorizableType.HOST);
 
-    Cluster cluster = new Cluster("cluster1");
+    Cluster cluster = new Cluster();
     Assert.assertEquals(cluster.getAuthzType(), AuthorizableType.CLUSTER);
 
     Topic topic = new Topic("topic1");

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/734e1905/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java
----------------------------------------------------------------------
diff --git 
a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java
 
b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java
index f1ed000..1da1193 100644
--- 
a/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java
+++ 
b/sentry-policy/sentry-policy-kafka/src/main/java/org/apache/sentry/policy/kafka/KafkaModelAuthorizables.java
@@ -23,9 +23,10 @@ import 
org.apache.sentry.core.model.kafka.KafkaAuthorizable.AuthorizableType;
 import org.apache.sentry.core.model.kafka.Host;
 import org.apache.sentry.core.model.kafka.Topic;
 import org.apache.sentry.provider.common.KeyValue;
+import org.apache.shiro.config.ConfigurationException;
 
 public class KafkaModelAuthorizables {
-  public static KafkaAuthorizable from(KeyValue keyValue) {
+  public static KafkaAuthorizable from(KeyValue keyValue) throws 
ConfigurationException {
     String prefix = keyValue.getKey().toLowerCase();
     String name = keyValue.getValue();
     for (AuthorizableType type : AuthorizableType.values()) {
@@ -36,16 +37,20 @@ public class KafkaModelAuthorizables {
     return null;
   }
 
-  public static KafkaAuthorizable from(String keyValue) {
+  public static KafkaAuthorizable from(String keyValue) throws 
ConfigurationException {
     return from(new KeyValue(keyValue));
   }
 
-  public static KafkaAuthorizable from(AuthorizableType type, String name) {
+  public static KafkaAuthorizable from(AuthorizableType type, String name) 
throws ConfigurationException {
     switch (type) {
       case HOST:
         return new Host(name);
-      case CLUSTER:
-        return new Cluster(name);
+      case CLUSTER: {
+        if (!name.equals(Cluster.NAME)) {
+          throw new ConfigurationException("Kafka's cluster resource can only 
have name " + Cluster.NAME);
+        }
+        return new Cluster();
+      }
       case TOPIC:
         return new Topic(name);
       case CONSUMERGROUP:

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/734e1905/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
----------------------------------------------------------------------
diff --git 
a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
 
b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
index 513c271..6a18148 100644
--- 
a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
+++ 
b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java
@@ -20,11 +20,13 @@ package org.apache.sentry.policy.kafka;
 
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertNull;
+import static junit.framework.Assert.fail;
 
 import org.apache.sentry.core.model.kafka.Cluster;
 import org.apache.sentry.core.model.kafka.ConsumerGroup;
 import org.apache.sentry.core.model.kafka.Host;
 import org.apache.sentry.core.model.kafka.Topic;
+import org.apache.shiro.config.ConfigurationException;
 import org.junit.Test;
 
 public class TestKafkaModelAuthorizables {
@@ -60,8 +62,8 @@ public class TestKafkaModelAuthorizables {
     Host host1 = (Host)KafkaModelAuthorizables.from("HOST=Host1");
     assertEquals("Host1", host1.getName());
 
-    Cluster cluster1 = 
(Cluster)KafkaModelAuthorizables.from("Cluster=cLuster1");
-    assertEquals("cLuster1", cluster1.getName());
+    Cluster cluster1 = 
(Cluster)KafkaModelAuthorizables.from("Cluster=kafka-cluster");
+    assertEquals("kafka-cluster", cluster1.getName());
 
     Topic topic1 = (Topic)KafkaModelAuthorizables.from("topic=topiC1");
     assertEquals("topiC1", topic1.getName());
@@ -69,4 +71,16 @@ public class TestKafkaModelAuthorizables {
     ConsumerGroup consumergroup1 = 
(ConsumerGroup)KafkaModelAuthorizables.from("ConsumerGroup=CG1");
     assertEquals("CG1", consumergroup1.getName());
   }
+
+  @Test
+  public void testClusterResourceNameIsRestricted() throws Exception {
+    try {
+      Cluster cluster1 = (Cluster) 
KafkaModelAuthorizables.from("Cluster=cluster1");
+      fail("Cluster with name other than " + Cluster.NAME + " must not have 
been created.");
+    } catch (ConfigurationException cex) {
+      assertEquals("Exception message is not as expected.", "Kafka's cluster 
resource can only have name " + Cluster.NAME, cex.getMessage());
+    } catch (Exception ex) {
+      fail("Configuration exception was expected for invalid Cluster name.");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/734e1905/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
----------------------------------------------------------------------
diff --git 
a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
 
b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
index 9e58895..7caa3a9 100644
--- 
a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
+++ 
b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPrivilegeValidator.java
@@ -38,7 +38,7 @@ public class TestKafkaPrivilegeValidator {
   @Test
   public void testWithoutHostResource() throws Exception {
     KafkaPrivilegeValidator kafkaPrivilegeValidator = new 
KafkaPrivilegeValidator();
-    testHostResourceIsChecked(kafkaPrivilegeValidator, 
"cluster=c1->action=read");
+    testHostResourceIsChecked(kafkaPrivilegeValidator, 
"cluster=kafka-cluster->action=read");
     testHostResourceIsChecked(kafkaPrivilegeValidator, 
"topic=t1->action=read");
     testHostResourceIsChecked(kafkaPrivilegeValidator, 
"consumergroup=g1->action=read");
   }
@@ -56,7 +56,7 @@ public class TestKafkaPrivilegeValidator {
   public void testValidPrivileges() throws Exception {
     KafkaPrivilegeValidator kafkaPrivilegeValidator = new 
KafkaPrivilegeValidator();
     try {
-      kafkaPrivilegeValidator.validate(new 
PrivilegeValidatorContext("host=host1->cluster=c1->action=read"));
+      kafkaPrivilegeValidator.validate(new 
PrivilegeValidatorContext("host=host1->cluster=kafka-cluster->action=read"));
     } catch (ConfigurationException ex) {
       Assert.fail("Not expected ConfigurationException");
     }
@@ -76,7 +76,7 @@ public class TestKafkaPrivilegeValidator {
   public void testInvalidHostResource() throws Exception {
     KafkaPrivilegeValidator kafkaPrivilegeValidator = new 
KafkaPrivilegeValidator();
     try {
-      kafkaPrivilegeValidator.validate(new 
PrivilegeValidatorContext("hhost=host1->cluster=c1->action=read"));
+      kafkaPrivilegeValidator.validate(new 
PrivilegeValidatorContext("hhost=host1->cluster=kafka-cluster->action=read"));
       Assert.fail("Expected ConfigurationException");
     } catch (ConfigurationException ex) {
     }
@@ -86,7 +86,7 @@ public class TestKafkaPrivilegeValidator {
   public void testInvalidClusterResource() throws Exception {
     KafkaPrivilegeValidator kafkaPrivilegeValidator = new 
KafkaPrivilegeValidator();
     try {
-      kafkaPrivilegeValidator.validate(new 
PrivilegeValidatorContext("host=host1->clluster=c1->action=read"));
+      kafkaPrivilegeValidator.validate(new 
PrivilegeValidatorContext("host=host1->clluster=kafka-cluster->action=read"));
       Assert.fail("Expected ConfigurationException");
     } catch (ConfigurationException ex) {
     }

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/734e1905/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderGeneralCases.java
----------------------------------------------------------------------
diff --git 
a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderGeneralCases.java
 
b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderGeneralCases.java
index bcc1198..dc7ade2 100644
--- 
a/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderGeneralCases.java
+++ 
b/sentry-policy/sentry-policy-kafka/src/test/java/org/apache/sentry/policy/kafka/provider/TestKafkaAuthorizationProviderGeneralCases.java
@@ -56,7 +56,7 @@ public class TestKafkaAuthorizationProviderGeneralCases {
 
   private static final Host HOST_1 = new Host("host1");
   private static final Host HOST_2 = new Host("host2");
-  private static final Cluster cluster1 = new Cluster("kafka-cluster");
+  private static final Cluster cluster1 = new Cluster();
   private static final Topic topic1 = new Topic("t1");
   private static final Topic topic2 = new Topic("t2");
   private static final ConsumerGroup cgroup1 = new ConsumerGroup("cg1");

Reply via email to