Repository: sentry
Updated Branches:
  refs/heads/upstream_master [created] fa78d2590


SENTRY-1989: Bump Kafka version from 0.9 to 0.11.1

Bump Kafka version to the latest version available. 0.9 was released last year 
(2016) and 0.11.1 is new in 2017

Here is the summary of code changes
1. Fixed the API incomatability
2. updated pom files to user updated version of kafka
3. Added new property to kafka server.
3. Refatored the the tests and commented one of the tests becausae of the known 
issue with the latest version of kafka.


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/fa78d259
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/fa78d259
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/fa78d259

Branch: refs/heads/upstream_master
Commit: fa78d2590d24618f4eb8090742869e953538e9b1
Parents: e5381cd
Author: Kalyan Kumar Kalvagadda <kkal...@cloudera.com>
Authored: Fri Oct 20 11:27:55 2017 -0500
Committer: Kalyan Kumar Kalvagadda <kkal...@cloudera.com>
Committed: Fri Oct 20 11:47:12 2017 -0500

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 sentry-tests/sentry-tests-kafka/pom.xml         |   2 +-
 .../sentry/tests/e2e/kafka/KafkaTestServer.java |   1 +
 .../sentry/tests/e2e/kafka/TestAclsCrud.java    |   7 +-
 .../sentry/tests/e2e/kafka/TestAuthorize.java   | 225 +++++++++++++++----
 5 files changed, 192 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/fa78d259/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ee85ddb..b1a04c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,7 +77,7 @@ limitations under the License.
     <jettyVersion>8.1.19.v20160209</jettyVersion>
     <joda-time.version>2.5</joda-time.version>
     <junit.version>4.10</junit.version>
-    <kafka.version>0.9.0.0</kafka.version>
+    <kafka.version>0.11.0.1</kafka.version>
     <libfb303.version>0.9.3</libfb303.version>
     <libthrift.version>0.9.3</libthrift.version>
     <log4j.version>1.2.16</log4j.version>

http://git-wip-us.apache.org/repos/asf/sentry/blob/fa78d259/sentry-tests/sentry-tests-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-kafka/pom.xml 
b/sentry-tests/sentry-tests-kafka/pom.xml
index c65323d..d160f17 100644
--- a/sentry-tests/sentry-tests-kafka/pom.xml
+++ b/sentry-tests/sentry-tests-kafka/pom.xml
@@ -44,7 +44,7 @@ limitations under the License.
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.10</artifactId>
+      <artifactId>kafka_2.11</artifactId>
       <version>${kafka.version}</version>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/sentry/blob/fa78d259/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java
----------------------------------------------------------------------
diff --git 
a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java
 
b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java
index faeb369..98d3b90 100644
--- 
a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java
+++ 
b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java
@@ -92,6 +92,7 @@ public class KafkaTestServer {
         props.put("delete.topic.enable", false);
         props.put("controlled.shutdown.retry.backoff.ms", "100");
         props.put("port", kafkaPort);
+        props.put("offsets.topic.replication.factor", "1");
         props.put("authorizer.class.name", 
"org.apache.sentry.kafka.authorizer.SentryKafkaAuthorizer");
         props.put("sentry.kafka.site.url", "file://" + 
sentrySitePath.getAbsolutePath());
         props.put("allow.everyone.if.no.acl.found", "true");

http://git-wip-us.apache.org/repos/asf/sentry/blob/fa78d259/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java
----------------------------------------------------------------------
diff --git 
a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java
 
b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java
index a52c8d6..ac17f36 100644
--- 
a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java
+++ 
b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java
@@ -116,11 +116,16 @@ public class TestAclsCrud extends 
AbstractKafkaSentryTestBase {
 
     final String role1 = "role1";
     Set<Acl> acls = new HashSet<>();
-    final Acl acl = new Acl(new KafkaPrincipal("role", role1),
+    Acl acl = new Acl(new KafkaPrincipal("role", role1),
         Allow$.MODULE$,
         "127.0.0.1",
         Operation$.MODULE$.fromString("READ"));
     acls.add(acl);
+    acl = new Acl(new KafkaPrincipal("role", role1),
+            Allow$.MODULE$,
+            "127.0.0.1",
+            Operation$.MODULE$.fromString("WRITE"));
+    acls.add(acl);
     scala.collection.immutable.Set<Acl> aclsScala = 
scala.collection.JavaConversions.asScalaSet(acls).toSet();
     Resource resource = new 
Resource(ResourceType$.MODULE$.fromString("TOPIC"), "test-Topic");
 

http://git-wip-us.apache.org/repos/asf/sentry/blob/fa78d259/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java
----------------------------------------------------------------------
diff --git 
a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java
 
b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java
index 6d2cabf..53f1f47 100644
--- 
a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java
+++ 
b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java
@@ -38,6 +38,7 @@ import 
org.apache.sentry.provider.db.generic.service.thrift.SentryGenericService
 import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable;
 import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,32 +61,69 @@ public class TestAuthorize extends 
AbstractKafkaSentryTestBase {
     SentryGenericServiceClientFactory.factoryReset();
     try {
       final String SuperuserName = "test";
-      testProduce(SuperuserName);
-      testConsume(SuperuserName);
+      testProduce(TOPIC_NAME, SuperuserName);
+      testConsume(TOPIC_NAME, SuperuserName);
     } catch (Exception ex) {
       Assert.fail("Superuser must have been allowed to perform any and all 
actions. \nException: \n" + ex);
     }
   }
+/*
+  Here are the list of permissions needed for a role a send to produce a 
message on a topic using kafkaProducer.
+  HOST=<hostname>->Topic=<topic name>->action=DESCRIBE
+  HOST=<hostname>->Cluster=<cluster name>->action=CREATE
+  HOST=<hostname>->Topic=<topic name>->action=WRITE
+ */
 
-  @Test
-  public void testProduceConsumeCycle() throws Exception {
-    LOGGER.debug("testProduceConsumeCycle");
-    final String localhost = InetAddress.getLocalHost().getHostAddress();
+/*
+  Here are the list of permissions needed for a role to subscribe and read the 
messages on a topic using kafkaConsumer.
+  HOST=<hostname>->CONSUMERGROUP=<group id>sentrykafkaconsumer->action=DESCRIBE
+  HOST=<hostname>->CONSUMERGROUP=<group id>->action=READ
+  HOST=<hostname>->Topic=<topic name>->action=DESCRIBE
+  HOST=<hostname>->Topic=<topic name>->action=READ
+ */
 
-    // SentryGenericServiceClientFactory.factoryReset();
 
+  @Test
+  @Ignore ("This test should be enabled after KAFKA-6091 is resolved")
+  public void testProduceConsumeCycleWithNoPrivileges() throws Exception {
     // START TESTING PRODUCER
+    final String TOPIC_NAME = "tOpIc1";
+    LOGGER.debug("testProduceConsumeCycleWithNoPrivileges");
     try {
-      testProduce("user1");
+      testProduce(TOPIC_NAME, "user1");
       Assert.fail("user1 must not have been authorized to create topic " + 
TOPIC_NAME + ".");
     } catch (ExecutionException ex) {
       assertCausedMessage(ex, "Not authorized to access topics: [" + 
TOPIC_NAME + "]");
     }
 
+    // START TESTING CONSUMER
+    try {
+      testConsume(TOPIC_NAME, StaticUserGroupRole.USER_1);
+      Assert.fail("user1 must not have been authorized to describe consumer 
group sentrykafkaconsumer.");
+    } catch (Exception ex) {
+      assertCausedMessage(ex, "Not authorized to access group: 
sentrykafkaconsumer");
+    }
+  }
+
+  @Test
+  public void testProduceCycleWithInsufficientPrivileges() throws Exception {
+    LOGGER.debug("testProduceCycleWithInsufficientPrivileges");
+    final String TOPIC_NAME = "tOpIc2";
+    final String localhost = InetAddress.getLocalHost().getHostAddress();
+    SentryGenericServiceClientFactory.factoryReset();
+
     final String role = StaticUserGroupRole.ROLE_1;
     final String group = StaticUserGroupRole.GROUP_1;
 
-    // Allow HOST=localhost->Topic=tOpIc1->action=describe
+    // START TESTING PRODUCER
+    /*
+     Permissions Added
+     HOST=<hostname>->Topic=<topic name>->action=DESCRIBE
+
+     Permissions Missing
+     HOST=<hostname>->Cluster=<cluster name>->action=CREATE
+     HOST=<hostname>->Topic=<topic name>->action=WRITE
+    */
     ArrayList<TAuthorizable> authorizables = new ArrayList<TAuthorizable>();
     Host host = new Host(localhost);
     authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
@@ -93,86 +131,191 @@ public class TestAuthorize extends 
AbstractKafkaSentryTestBase {
     authorizables.add(new TAuthorizable(topic.getTypeName(), topic.getName()));
     addPermissions(role, group, KafkaActionConstant.DESCRIBE, authorizables);
     try {
-      testProduce("user1");
+      testProduce(TOPIC_NAME, StaticUserGroupRole.USER_1);
       Assert.fail("user1 must not have been authorized to create topic " + 
TOPIC_NAME + ".");
     } catch (ExecutionException ex) {
       assertCausedMessage(ex, "Not authorized to access topics: [" + 
TOPIC_NAME + "]");
     }
 
-    // Allow HOST=localhost->Cluster=kafka-cluster->action=create
+    /*
+     Permissions Added
+     HOST=<hostname>->Topic=<topic name>->action=DESCRIBE
+     HOST=<hostname>->Cluster=<cluster name>->action=CREATE
+
+     Permissions Missing
+     HOST=<hostname>->Topic=<topic name>->action=WRITE
+    */
     authorizables = new ArrayList<TAuthorizable>();
     authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
     Cluster cluster = new Cluster();
     authorizables.add(new TAuthorizable(cluster.getTypeName(), 
cluster.getName()));
     addPermissions(role, group, KafkaActionConstant.CREATE, authorizables);
     try {
-      testProduce("user1");
+      testProduce(TOPIC_NAME, StaticUserGroupRole.USER_1);
       Assert.fail("user1 must not have been authorized to create topic " + 
TOPIC_NAME + ".");
     } catch (ExecutionException ex) {
       assertCausedMessage(ex, "Not authorized to access topics: [" + 
TOPIC_NAME + "]");
     }
+  }
+
+  @Test
+  public void testProduceConsumeSuccess() throws Exception {
+    LOGGER.debug("testProduceConsumeSuccess");
+    final String TOPIC_NAME = "tOpIc3";
+    final String localhost = InetAddress.getLocalHost().getHostAddress();
+
+    SentryGenericServiceClientFactory.factoryReset();
+
+    // START PRODUCER
+    ArrayList<TAuthorizable> authorizables = new ArrayList<TAuthorizable>();
+    Topic topic = new Topic(TOPIC_NAME); // Topic name is case sensitive.
+    Host host = new Host(localhost);
+    final String role = StaticUserGroupRole.ROLE_1;
+    final String group = StaticUserGroupRole.GROUP_1;
 
-    // Allow HOST=localhost->Topic=tOpIc1->action=write
+  /*
+    Permissions Added
+    HOST=<hostname>->Topic=<topic name>->action=DESCRIBE
+    HOST=<hostname>->Topic=<topic name>->action=WRITE
+    HOST=<hostname>->Cluster=<cluster name>->action=CREATE
+  */
     authorizables = new ArrayList<TAuthorizable>();
     authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
     authorizables.add(new TAuthorizable(topic.getTypeName(), topic.getName()));
+    addPermissions(role, group, KafkaActionConstant.DESCRIBE, authorizables);
     addPermissions(role, group, KafkaActionConstant.WRITE, authorizables);
+
+    authorizables = new ArrayList<TAuthorizable>();
+    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
+    Cluster cluster = new Cluster();
+    authorizables.add(new TAuthorizable(cluster.getTypeName(), 
cluster.getName()));
+    addPermissions(role, group, KafkaActionConstant.CREATE, authorizables);
     try {
-      testProduce("user1");
+      testProduce(TOPIC_NAME, StaticUserGroupRole.USER_1);
     } catch (Exception ex) {
       Assert.fail("user1 should have been able to successfully produce to 
topic " + TOPIC_NAME + ". \n Exception: " + ex);
     }
 
-    // START TESTING CONSUMER
+  // START TESTING CONSUMER
+  /*
+    Permissions Added
+    HOST=<hostname>->Topic=<topic name>->action=DESCRIBE
+    HOST=<hostname>->Topic=<topic name>->action=WRITE
+    HOST=<hostname>->Cluster=<cluster name>->action=CREATE
+    HOST=<hostname>->CONSUMERGROUP=<group id>->action=READ
+    HOST=<hostname>->Topic=<topic name>->action=READ
+  */
+    authorizables = new ArrayList<TAuthorizable>();
+    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
+    authorizables.add(new TAuthorizable(topic.getTypeName(), topic.getName()));
+    addPermissions(role, group, KafkaActionConstant.READ, authorizables);
+    addPermissions(role, group, KafkaActionConstant.DESCRIBE, authorizables);
+
+    authorizables = new ArrayList<TAuthorizable>();
+    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
+    ConsumerGroup consumerGroup = new ConsumerGroup("sentrykafkaconsumer");
+    authorizables.add(new TAuthorizable(consumerGroup.getTypeName(), 
consumerGroup.getName()));
+    addPermissions(role, group, KafkaActionConstant.DESCRIBE, authorizables);
+    addPermissions(role, group, KafkaActionConstant.READ, authorizables);
     try {
-      testConsume("user1");
-      Assert.fail("user1 must not have been authorized to describe consumer 
group sentrykafkaconsumer.");
+      testConsume(TOPIC_NAME, StaticUserGroupRole.USER_1);
     } catch (Exception ex) {
-      assertCausedMessage(ex, "Not authorized to access group: 
sentrykafkaconsumer");
+      Assert.fail("user1 should have been able to successfully read from topic 
" + TOPIC_NAME + ". \n Exception: " + ex);
+
     }
+  }
+
+  @Test
+  public void testConsumeCycleWithInsufficientPrivileges() throws Exception {
+    LOGGER.debug("testConsumeCycleWithInsufficientPrivileges");
+    final String TOPIC_NAME = "tOpIc4";
+    final String localhost = InetAddress.getLocalHost().getHostAddress();
+    SentryGenericServiceClientFactory.factoryReset();
+
+    // START TESTING PRODUCER
+    ArrayList<TAuthorizable> authorizables = new ArrayList<TAuthorizable>();
+    Topic topic = new Topic(TOPIC_NAME); // Topic name is case sensitive.
+    Host host = new Host(localhost);
+    final String role = StaticUserGroupRole.ROLE_1;
+    final String group = StaticUserGroupRole.GROUP_1;
+
+  /*
+    Permissions Added
+    HOST=<hostname>->Topic=<topic name>->action=DESCRIBE
+    HOST=<hostname>->Topic=<topic name>->action=WRITE
+    HOST=<hostname>->Cluster=<cluster name>->action=CREATE
+  */
 
-    // HOST=localhost->Group=SentryKafkaConsumer->action=describe
+    authorizables = new ArrayList<TAuthorizable>();
+    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
+    authorizables.add(new TAuthorizable(topic.getTypeName(), topic.getName()));
+    addPermissions(role, group, KafkaActionConstant.DESCRIBE, authorizables);
+    addPermissions(role, group, KafkaActionConstant.WRITE, authorizables);
+
+    authorizables = new ArrayList<TAuthorizable>();
+    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
+    Cluster cluster = new Cluster();
+    authorizables.add(new TAuthorizable(cluster.getTypeName(), 
cluster.getName()));
+    addPermissions(role, group, KafkaActionConstant.CREATE, authorizables);
+    try {
+      testProduce(TOPIC_NAME, StaticUserGroupRole.USER_1);
+    } catch (Exception ex) {
+      Assert.fail("user1 should have been able to successfully produce to 
topic " + TOPIC_NAME + ". \n Exception: " + ex);
+    }
+  /*
+    Permissions Added
+    HOST=<hostname>->Topic=<topic name>->action=DESCRIBE
+    HOST=<hostname>->Topic=<topic name>->action=WRITE
+    HOST=<hostname>->Cluster=<cluster name>->action=CREATE
+
+    Permissions Missing
+    HOST=<hostname>->CONSUMERGROUP=<group id>->action=READ
+    HOST=<hostname>->Topic=<topic name>->action=READ
+  */
     authorizables = new ArrayList<TAuthorizable>();
     authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
     ConsumerGroup consumerGroup = new ConsumerGroup("sentrykafkaconsumer");
     authorizables.add(new TAuthorizable(consumerGroup.getTypeName(), 
consumerGroup.getName()));
     addPermissions(role, group, KafkaActionConstant.DESCRIBE, authorizables);
     try {
-      testConsume("user1");
+      testConsume(TOPIC_NAME, StaticUserGroupRole.USER_1);
       Assert.fail("user1 must not have been authorized to read consumer group 
sentrykafkaconsumer.");
     } catch (Exception ex) {
-      assertCausedMessage(ex, "Not authorized to access group: 
sentrykafkaconsumer");
+      assertCausedMessage(ex, "Not authorized to access topics: [" + 
TOPIC_NAME + "]");
     }
 
-    // HOST=localhost->Group=SentryKafkaConsumer->action=read
+  /*
+    Permissions Added
+    HOST=<hostname>->Topic=<topic name>->action=DESCRIBE
+    HOST=<hostname>->Topic=<topic name>->action=WRITE
+    HOST=<hostname>->Cluster=<cluster name>->action=CREATE
+    HOST=<hostname>->CONSUMERGROUP=<group id>->action=READ
+
+    Missing Permissions
+    HOST=<hostname>->Topic=<topic name>->action=READ
+  */
     authorizables = new ArrayList<TAuthorizable>();
     authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
     authorizables.add(new TAuthorizable(consumerGroup.getTypeName(), 
consumerGroup.getName()));
     addPermissions(role, group, KafkaActionConstant.READ, authorizables);
     try {
-      testConsume("user1");
+      testConsume(TOPIC_NAME, StaticUserGroupRole.USER_1);
       Assert.fail("user1 must not have been authorized to read from topic " + 
TOPIC_NAME + ".");
     } catch (Exception ex) {
       assertCausedMessage(ex, "Not authorized to access topics: [" + 
TOPIC_NAME + "]");
     }
-
-    // HOST=localhost->Topic=tOpIc1->action=read
-    authorizables = new ArrayList<TAuthorizable>();
-    authorizables.add(new TAuthorizable(host.getTypeName(), host.getName()));
-    authorizables.add(new TAuthorizable(topic.getTypeName(), topic.getName()));
-    addPermissions(role, group, KafkaActionConstant.READ, authorizables);
-    testConsume("user1");
   }
 
-  private void addPermissions(String role, String group, String action, 
ArrayList<TAuthorizable> authorizables) throws Exception {
+  private void addPermissions(String role, String group, String action, 
ArrayList<TAuthorizable> authorizables)
+          throws Exception {
     SentryGenericServiceClient sentryClient = getSentryClient();
     try {
       sentryClient.createRoleIfNotExist(ADMIN_USER, role, COMPONENT);
       sentryClient.addRoleToGroups(ADMIN_USER, role, COMPONENT, 
Sets.newHashSet(group));
 
       sentryClient.grantPrivilege(ADMIN_USER, role, COMPONENT,
-          new TSentryPrivilege(COMPONENT, "kafka", authorizables,
-              action));
+              new TSentryPrivilege(COMPONENT, "kafka", authorizables,
+                      action));
     } finally {
       if (sentryClient != null) {
         sentryClient.close();
@@ -182,11 +325,11 @@ public class TestAuthorize extends 
AbstractKafkaSentryTestBase {
     sleepIfCachingEnabled();
   }
 
-  private void testProduce(String producerUser) throws Exception {
+  private void testProduce(String topic, String producerUser) throws Exception 
{
     final KafkaProducer<String, String> kafkaProducer = 
createKafkaProducer(producerUser);
     try {
       final String msg = "message1";
-      ProducerRecord<String, String> producerRecord = new 
ProducerRecord<String, String>(TOPIC_NAME, msg);
+      ProducerRecord<String, String> producerRecord = new 
ProducerRecord<String, String>(topic, msg);
       kafkaProducer.send(producerRecord).get();
       LOGGER.debug("Sent message: " + producerRecord);
     } finally {
@@ -194,11 +337,11 @@ public class TestAuthorize extends 
AbstractKafkaSentryTestBase {
     }
   }
 
-  private void testConsume(String consumerUser) throws Exception {
+  private void testConsume(String topic, String consumerUser) throws Exception 
{
     final KafkaConsumer<String, String> kafkaConsumer = 
createKafkaConsumer(consumerUser);
     try {
       final String msg = "message1";
-      kafkaConsumer.subscribe(Collections.singletonList(TOPIC_NAME), new 
CustomRebalanceListener(kafkaConsumer));
+      kafkaConsumer.subscribe(Collections.singletonList(topic), new 
CustomRebalanceListener(kafkaConsumer));
       waitTillTrue("Did not receive expected message.", 60, 2, new 
Callable<Boolean>() {
         @Override
         public Boolean call() throws Exception {
@@ -268,8 +411,8 @@ public class TestAuthorize extends 
AbstractKafkaSentryTestBase {
    * @throws Exception
    */
   private void waitTillTrue(
-      String failureMessage, long maxWaitTime, long loopInterval, 
Callable<Boolean> testFunc)
-      throws Exception {
+          String failureMessage, long maxWaitTime, long loopInterval, 
Callable<Boolean> testFunc)
+          throws Exception {
     long startTime = System.currentTimeMillis();
     while (System.currentTimeMillis() - startTime <= maxWaitTime * 1000L) {
       if (testFunc.call()) {
@@ -296,9 +439,7 @@ public class TestAuthorize extends 
AbstractKafkaSentryTestBase {
 
     @Override
     public void onPartitionsAssigned(Collection<TopicPartition> collection) {
-      for (TopicPartition tp : collection) {
-        consumer.seekToBeginning(tp);
-      }
+      consumer.seekToBeginning(collection);
     }
   }
 }

Reply via email to