This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 7225912  KAFKA-7287: Set open ACL for old consumer znode path (#5503) 
(#5585)
7225912 is described below

commit 7225912bc1652092695eae6df59056e754c36ef4
Author: Manikumar Reddy O <[email protected]>
AuthorDate: Sat Sep 1 00:24:36 2018 +0530

    KAFKA-7287: Set open ACL for old consumer znode path (#5503) (#5585)
    
    Reviewers: Sriharsha Chintalapani <[email protected]>, Satish Duggana 
<[email protected]>, Jun Rao <[email protected]>
---
 core/src/main/scala/kafka/zk/ZkData.scala                    | 12 +++++++++---
 .../scala/unit/kafka/security/auth/ZkAuthorizationTest.scala | 12 ++++++++++--
 2 files changed, 19 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/zk/ZkData.scala 
b/core/src/main/scala/kafka/zk/ZkData.scala
index 03f4a05..cff32a2 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -407,8 +407,13 @@ object PreferredReplicaElectionZNode {
   }.map(_.toSet).getOrElse(Set.empty)
 }
 
+//old consumer path znode
+object ConsumerPathZNode {
+  def path = "/consumers"
+}
+
 object ConsumerOffset {
-  def path(group: String, topic: String, partition: Integer) = 
s"/consumers/${group}/offsets/${topic}/${partition}"
+  def path(group: String, topic: String, partition: Integer) = 
s"${ConsumerPathZNode.path}/${group}/offsets/${topic}/${partition}"
   def encode(offset: Long): Array[Byte] = offset.toString.getBytes(UTF_8)
   def decode(bytes: Array[Byte]): Option[Long] = Option(bytes).map(new 
String(_, UTF_8).toLong)
 }
@@ -536,7 +541,7 @@ object ZkData {
 
   // These are persistent ZK paths that should exist on kafka broker startup.
   val PersistentZkPaths = Seq(
-    "/consumers", // old consumer path
+    ConsumerPathZNode.path, // old consumer path
     BrokerIdsZNode.path,
     TopicsZNode.path,
     ConfigEntityChangeNotificationZNode.path,
@@ -558,7 +563,8 @@ object ZkData {
   }
 
   def defaultAcls(isSecure: Boolean, path: String): Seq[ACL] = {
-    if (isSecure) {
+    //Old Consumer path is kept open as different consumers will write under 
this node.
+    if (!ConsumerPathZNode.path.equals(path) && isSecure) {
       val acls = new ArrayBuffer[ACL]
       acls ++= ZooDefs.Ids.CREATOR_ALL_ACL.asScala
       if (!sensitivePath(path))
diff --git 
a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala 
b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 033ca67..a80c407 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -19,10 +19,10 @@ package kafka.security.auth
 
 import kafka.admin.ZkSecurityMigrator
 import kafka.utils.{Logging, TestUtils, ZkUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.zk.{ConsumerPathZNode, ZooKeeperTestHarness}
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.security.JaasUtils
-import org.apache.zookeeper.data.{ACL}
+import org.apache.zookeeper.data.{ACL, Stat}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import scala.collection.JavaConverters._
@@ -299,4 +299,12 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness 
with Logging {
         }
     }
   }
+
+  @Test
+  def testConsumerOffsetPathAcls(): Unit = {
+    zkClient.makeSurePersistentPathExists(ConsumerPathZNode.path)
+
+    val consumerPathAcls = 
zkClient.currentZooKeeper.getACL(ConsumerPathZNode.path, new Stat())
+    assertTrue("old consumer znode path acls are not open", 
consumerPathAcls.asScala.forall(TestUtils.isAclUnsecure))
+  }
 }

Reply via email to