This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 7225912 KAFKA-7287: Set open ACL for old consumer znode path (#5503)
(#5585)
7225912 is described below
commit 7225912bc1652092695eae6df59056e754c36ef4
Author: Manikumar Reddy O <[email protected]>
AuthorDate: Sat Sep 1 00:24:36 2018 +0530
KAFKA-7287: Set open ACL for old consumer znode path (#5503) (#5585)
Reviewers: Sriharsha Chintalapani <[email protected]>, Satish Duggana
<[email protected]>, Jun Rao <[email protected]>
---
core/src/main/scala/kafka/zk/ZkData.scala | 12 +++++++++---
.../scala/unit/kafka/security/auth/ZkAuthorizationTest.scala | 12 ++++++++++--
2 files changed, 19 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala
b/core/src/main/scala/kafka/zk/ZkData.scala
index 03f4a05..cff32a2 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -407,8 +407,13 @@ object PreferredReplicaElectionZNode {
}.map(_.toSet).getOrElse(Set.empty)
}
+//old consumer path znode
+object ConsumerPathZNode {
+ def path = "/consumers"
+}
+
object ConsumerOffset {
- def path(group: String, topic: String, partition: Integer) =
s"/consumers/${group}/offsets/${topic}/${partition}"
+ def path(group: String, topic: String, partition: Integer) =
s"${ConsumerPathZNode.path}/${group}/offsets/${topic}/${partition}"
def encode(offset: Long): Array[Byte] = offset.toString.getBytes(UTF_8)
def decode(bytes: Array[Byte]): Option[Long] = Option(bytes).map(new
String(_, UTF_8).toLong)
}
@@ -536,7 +541,7 @@ object ZkData {
// These are persistent ZK paths that should exist on kafka broker startup.
val PersistentZkPaths = Seq(
- "/consumers", // old consumer path
+ ConsumerPathZNode.path, // old consumer path
BrokerIdsZNode.path,
TopicsZNode.path,
ConfigEntityChangeNotificationZNode.path,
@@ -558,7 +563,8 @@ object ZkData {
}
def defaultAcls(isSecure: Boolean, path: String): Seq[ACL] = {
- if (isSecure) {
+ //Old Consumer path is kept open as different consumers will write under
this node.
+ if (!ConsumerPathZNode.path.equals(path) && isSecure) {
val acls = new ArrayBuffer[ACL]
acls ++= ZooDefs.Ids.CREATOR_ALL_ACL.asScala
if (!sensitivePath(path))
diff --git
a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 033ca67..a80c407 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -19,10 +19,10 @@ package kafka.security.auth
import kafka.admin.ZkSecurityMigrator
import kafka.utils.{Logging, TestUtils, ZkUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.zk.{ConsumerPathZNode, ZooKeeperTestHarness}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.security.JaasUtils
-import org.apache.zookeeper.data.{ACL}
+import org.apache.zookeeper.data.{ACL, Stat}
import org.junit.Assert._
import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
@@ -299,4 +299,12 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness
with Logging {
}
}
}
+
+ @Test
+ def testConsumerOffsetPathAcls(): Unit = {
+ zkClient.makeSurePersistentPathExists(ConsumerPathZNode.path)
+
+ val consumerPathAcls =
zkClient.currentZooKeeper.getACL(ConsumerPathZNode.path, new Stat())
+ assertTrue("old consumer znode path acls are not open",
consumerPathAcls.asScala.forall(TestUtils.isAclUnsecure))
+ }
}