This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 7fc59b0ce96 KAFKA-18519: Remove Json.scala, cleanup AclEntry.scala
(#18614)
7fc59b0ce96 is described below
commit 7fc59b0ce96d83b287d299e3bd07e9dfede2ec1f
Author: Ken Huang <[email protected]>
AuthorDate: Wed Jan 22 23:12:06 2025 +0800
KAFKA-18519: Remove Json.scala, cleanup AclEntry.scala (#18614)
Reviewers: Mickael Maison <[email protected]>, Ismael Juma
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
core/src/main/scala/kafka/utils/Json.scala | 92 ---------
.../kafka/security/authorizer/AclEntryTest.scala | 48 -----
.../src/test/scala/unit/kafka/utils/JsonTest.scala | 134 -------------
.../unit/kafka/utils/json/JsonValueTest.scala | 212 ---------------------
gradle/spotbugs-exclude.xml | 7 -
.../apache/kafka/jmh/acl/AuthorizerBenchmark.java | 39 ++--
.../apache/kafka/security/authorizer/AclEntry.java | 122 +-----------
7 files changed, 17 insertions(+), 637 deletions(-)
diff --git a/core/src/main/scala/kafka/utils/Json.scala
b/core/src/main/scala/kafka/utils/Json.scala
deleted file mode 100644
index 049941cd01d..00000000000
--- a/core/src/main/scala/kafka/utils/Json.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.utils
-
-import com.fasterxml.jackson.core.{JsonParseException, JsonProcessingException}
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.databind.node.MissingNode
-import kafka.utils.json.JsonValue
-
-import scala.reflect.ClassTag
-
-/**
- * Provides methods for parsing JSON with Jackson and encoding to JSON with a
simple and naive custom implementation.
- */
-object Json {
-
- private val mapper = new ObjectMapper()
-
- /**
- * Parse a JSON string into a JsonValue if possible. `None` is returned if
`input` is not valid JSON.
- */
- def parseFull(input: String): Option[JsonValue] =
tryParseFull(input).toOption
-
- /**
- * Parse a JSON string into either a generic type T, or a
JsonProcessingException in the case of
- * exception.
- */
- def parseStringAs[T](input: String)(implicit tag: ClassTag[T]):
Either[JsonProcessingException, T] = {
- try Right(mapper.readValue(input, tag.runtimeClass).asInstanceOf[T])
- catch { case e: JsonProcessingException => Left(e) }
- }
-
- /**
- * Parse a JSON byte array into a JsonValue if possible. `None` is returned
if `input` is not valid JSON.
- */
- def parseBytes(input: Array[Byte]): Option[JsonValue] =
- try Option(mapper.readTree(input)).map(JsonValue(_))
- catch { case _: JsonProcessingException => None }
-
- def tryParseBytes(input: Array[Byte]): Either[JsonProcessingException,
JsonValue] =
- try Right(mapper.readTree(input)).map(JsonValue(_))
- catch { case e: JsonProcessingException => Left(e) }
-
- /**
- * Parse a JSON byte array into either a generic type T, or a
JsonProcessingException in the case of exception.
- */
- def parseBytesAs[T](input: Array[Byte])(implicit tag: ClassTag[T]):
Either[JsonProcessingException, T] = {
- try Right(mapper.readValue(input, tag.runtimeClass).asInstanceOf[T])
- catch { case e: JsonProcessingException => Left(e) }
- }
-
- /**
- * Parse a JSON string into a JsonValue if possible. It returns an `Either`
where `Left` will be an exception and
- * `Right` is the `JsonValue`.
- * @param input a JSON string to parse
- * @return An `Either` which in case of `Left` means an exception and
`Right` is the actual return value.
- */
- def tryParseFull(input: String): Either[JsonProcessingException, JsonValue] =
- if (input == null || input.isEmpty)
- Left(new JsonParseException(MissingNode.getInstance().traverse(), "The
input string shouldn't be empty"))
- else
- try Right(mapper.readTree(input)).map(JsonValue(_))
- catch { case e: JsonProcessingException => Left(e) }
-
- /**
- * Encode an object into a JSON string. This method accepts any type
supported by Jackson's ObjectMapper in
- * the default configuration. That is, Java collections are supported, but
Scala collections are not (to avoid
- * a jackson-scala dependency).
- */
- def encodeAsString(obj: Any): String = mapper.writeValueAsString(obj)
-
- /**
- * Encode an object into a JSON value in bytes. This method accepts any type
supported by Jackson's ObjectMapper in
- * the default configuration. That is, Java collections are supported, but
Scala collections are not (to avoid
- * a jackson-scala dependency).
- */
- def encodeAsBytes(obj: Any): Array[Byte] = mapper.writeValueAsBytes(obj)
-}
diff --git
a/core/src/test/scala/unit/kafka/security/authorizer/AclEntryTest.scala
b/core/src/test/scala/unit/kafka/security/authorizer/AclEntryTest.scala
deleted file mode 100644
index f6867b6233b..00000000000
--- a/core/src/test/scala/unit/kafka/security/authorizer/AclEntryTest.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.authorizer
-
-import java.nio.charset.StandardCharsets.UTF_8
-import kafka.utils.Json
-import org.apache.kafka.common.acl.AccessControlEntry
-import org.apache.kafka.common.acl.AclOperation.READ
-import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.security.authorizer.AclEntry
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-
-import java.util
-
-class AclEntryTest {
-
- val AclJson = """{"version": 1, "acls": [{"host": "host1","permissionType":
"Deny","operation": "READ", "principal": "User:alice" },
- { "host": "*" , "permissionType": "Allow", "operation": "Read",
"principal": "User:bob" },
- { "host": "host1", "permissionType": "Deny", "operation": "Read" ,
"principal": "User:bob"}]}"""
-
- @Test
- def testAclJsonConversion(): Unit = {
- val acl1 = new AclEntry(new AccessControlEntry(new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "alice").toString, "host1", READ,
DENY))
- val acl2 = new AclEntry(new AccessControlEntry(new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob").toString, "*", READ, ALLOW))
- val acl3 = new AclEntry(new AccessControlEntry(new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob").toString, "host1", READ, DENY))
-
- val acls = new util.HashSet[AclEntry](util.Arrays.asList(acl1, acl2, acl3))
-
- assertEquals(acls,
AclEntry.fromBytes(Json.encodeAsBytes(AclEntry.toJsonCompatibleMap(acls))))
- assertEquals(acls, AclEntry.fromBytes(AclJson.getBytes(UTF_8)))
- }
-}
diff --git a/core/src/test/scala/unit/kafka/utils/JsonTest.scala
b/core/src/test/scala/unit/kafka/utils/JsonTest.scala
deleted file mode 100644
index aca7d45600d..00000000000
--- a/core/src/test/scala/unit/kafka/utils/JsonTest.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.utils
-
-import java.nio.charset.StandardCharsets
-
-import com.fasterxml.jackson.annotation.JsonProperty
-import com.fasterxml.jackson.core.{JsonParseException, JsonProcessingException}
-import com.fasterxml.jackson.databind.JsonNode
-import com.fasterxml.jackson.databind.node._
-import kafka.utils.JsonTest.TestObject
-import kafka.utils.json.JsonValue
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-
-import scala.jdk.CollectionConverters._
-import scala.collection.Map
-
-object JsonTest {
- case class TestObject(@JsonProperty("foo") foo: String, @JsonProperty("bar")
bar: Int)
-}
-
-class JsonTest {
-
- @Test
- def testJsonParse(): Unit = {
- val jnf = JsonNodeFactory.instance
-
- assertEquals(Some(JsonValue(new ObjectNode(jnf))), Json.parseFull("{}"))
- assertEquals(Right(JsonValue(new ObjectNode(jnf))),
Json.tryParseFull("{}"))
- assertEquals(classOf[Left[JsonProcessingException, JsonValue]],
Json.tryParseFull(null).getClass)
- assertThrows(classOf[IllegalArgumentException], () =>
Json.tryParseBytes(null))
-
- assertEquals(None, Json.parseFull(""))
- assertEquals(classOf[Left[JsonProcessingException, JsonValue]],
Json.tryParseFull("").getClass)
-
- assertEquals(None, Json.parseFull("""{"foo":"bar"s}"""))
- val tryRes = Json.tryParseFull("""{"foo":"bar"s}""")
- assertTrue(tryRes.isInstanceOf[Left[_, JsonValue]])
-
- val objectNode = new ObjectNode(
- jnf,
- Map[String, JsonNode]("foo" -> new TextNode("bar"), "is_enabled" ->
BooleanNode.TRUE).asJava
- )
- assertEquals(Some(JsonValue(objectNode)), Json.parseFull("""{"foo":"bar",
"is_enabled":true}"""))
- assertEquals(Right(JsonValue(objectNode)),
Json.tryParseFull("""{"foo":"bar", "is_enabled":true}"""))
-
- val arrayNode = new ArrayNode(jnf)
- Vector(1, 2, 3).map(new IntNode(_)).foreach(arrayNode.add)
- assertEquals(Some(JsonValue(arrayNode)), Json.parseFull("[1, 2, 3]"))
-
- // Test with encoder that properly escapes backslash and quotes
- val map = Map("foo1" -> """bar1\,bar2""", "foo2" -> """\bar""").asJava
- val encoded = Json.encodeAsString(map)
- val decoded = Json.parseFull(encoded)
- assertEquals(decoded, Json.parseFull("""{"foo1":"bar1\\,bar2",
"foo2":"\\bar"}"""))
- }
-
- @Test
- def testEncodeAsString(): Unit = {
- assertEquals("null", Json.encodeAsString(null))
- assertEquals("1", Json.encodeAsString(1))
- assertEquals("1", Json.encodeAsString(1L))
- assertEquals("1", Json.encodeAsString(1.toByte))
- assertEquals("1", Json.encodeAsString(1.toShort))
- assertEquals("1.0", Json.encodeAsString(1.0))
- assertEquals(""""str"""", Json.encodeAsString("str"))
- assertEquals("true", Json.encodeAsString(true))
- assertEquals("false", Json.encodeAsString(false))
- assertEquals("[]", Json.encodeAsString(Seq().asJava))
- assertEquals("[null]", Json.encodeAsString(Seq(null).asJava))
- assertEquals("[1,2,3]", Json.encodeAsString(Seq(1,2,3).asJava))
- assertEquals("""[1,"2",[3],null]""",
Json.encodeAsString(Seq(1,"2",Seq(3).asJava,null).asJava))
- assertEquals("{}", Json.encodeAsString(Map().asJava))
- assertEquals("""{"a":1,"b":2,"c":null}""", Json.encodeAsString(Map("a" ->
1, "b" -> 2, "c" -> null).asJava))
- assertEquals("""{"a":[1,2],"c":[3,4]}""", Json.encodeAsString(Map("a" ->
Seq(1,2).asJava, "c" -> Seq(3,4).asJava).asJava))
- assertEquals("""{"a":[1,2],"b":[3,4],"c":null}""",
Json.encodeAsString(Map("a" -> Seq(1,2).asJava, "b" -> Seq(3,4).asJava, "c" ->
null).asJava))
- assertEquals(""""str1\\,str2"""", Json.encodeAsString("""str1\,str2"""))
- assertEquals(""""\"quoted\""""", Json.encodeAsString(""""quoted""""))
- }
-
- @Test
- def testEncodeAsBytes(): Unit = {
- assertEquals("null", new String(Json.encodeAsBytes(null),
StandardCharsets.UTF_8))
- assertEquals("1", new String(Json.encodeAsBytes(1),
StandardCharsets.UTF_8))
- assertEquals("1", new String(Json.encodeAsBytes(1L),
StandardCharsets.UTF_8))
- assertEquals("1", new String(Json.encodeAsBytes(1.toByte),
StandardCharsets.UTF_8))
- assertEquals("1", new String(Json.encodeAsBytes(1.toShort),
StandardCharsets.UTF_8))
- assertEquals("1.0", new String(Json.encodeAsBytes(1.0),
StandardCharsets.UTF_8))
- assertEquals(""""str"""", new String(Json.encodeAsBytes("str"),
StandardCharsets.UTF_8))
- assertEquals("true", new String(Json.encodeAsBytes(true),
StandardCharsets.UTF_8))
- assertEquals("false", new String(Json.encodeAsBytes(false),
StandardCharsets.UTF_8))
- assertEquals("[]", new String(Json.encodeAsBytes(Seq().asJava),
StandardCharsets.UTF_8))
- assertEquals("[null]", new String(Json.encodeAsBytes(Seq(null).asJava),
StandardCharsets.UTF_8))
- assertEquals("[1,2,3]", new String(Json.encodeAsBytes(Seq(1,2,3).asJava),
StandardCharsets.UTF_8))
- assertEquals("""[1,"2",[3],null]""", new
String(Json.encodeAsBytes(Seq(1,"2",Seq(3).asJava,null).asJava),
StandardCharsets.UTF_8))
- assertEquals("{}", new String(Json.encodeAsBytes(Map().asJava),
StandardCharsets.UTF_8))
- assertEquals("""{"a":1,"b":2,"c":null}""", new
String(Json.encodeAsBytes(Map("a" -> 1, "b" -> 2, "c" -> null).asJava),
StandardCharsets.UTF_8))
- assertEquals("""{"a":[1,2],"c":[3,4]}""", new
String(Json.encodeAsBytes(Map("a" -> Seq(1,2).asJava, "c" ->
Seq(3,4).asJava).asJava), StandardCharsets.UTF_8))
- assertEquals("""{"a":[1,2],"b":[3,4],"c":null}""", new
String(Json.encodeAsBytes(Map("a" -> Seq(1,2).asJava, "b" -> Seq(3,4).asJava,
"c" -> null).asJava), StandardCharsets.UTF_8))
- assertEquals(""""str1\\,str2"""", new
String(Json.encodeAsBytes("""str1\,str2"""), StandardCharsets.UTF_8))
- assertEquals(""""\"quoted\""""", new
String(Json.encodeAsBytes(""""quoted""""), StandardCharsets.UTF_8))
- }
-
- @Test
- def testParseTo(): Unit = {
- val foo = "baz"
- val bar = 1
-
- val result = Json.parseStringAs[TestObject](s"""{"foo": "$foo", "bar":
$bar}""")
-
- assertEquals(Right(TestObject(foo, bar)), result)
- }
-
- @Test
- def testParseToWithInvalidJson(): Unit = {
- val result = Json.parseStringAs[TestObject]("{invalid json}")
- assertEquals(Left(classOf[JsonParseException]),
result.left.map(_.getClass))
- }
-}
diff --git a/core/src/test/scala/unit/kafka/utils/json/JsonValueTest.scala
b/core/src/test/scala/unit/kafka/utils/json/JsonValueTest.scala
deleted file mode 100644
index 8194b298b4e..00000000000
--- a/core/src/test/scala/unit/kafka/utils/json/JsonValueTest.scala
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils.json
-
-import scala.collection.Seq
-
-import com.fasterxml.jackson.databind.{ObjectMapper, JsonMappingException}
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.Assertions._
-
-import kafka.utils.Json
-
-class JsonValueTest {
-
- private val json = """
- |{
- | "boolean": false,
- | "int": 1234,
- | "long": 3000000000,
- | "double": 16.244355,
- | "string": "string",
- | "number_as_string": "123",
- | "array": [4.0, 11.1, 44.5],
- | "object": {
- | "a": true,
- | "b": false
- | },
- | "null": null
- |}
- """.stripMargin
-
- private def parse(s: String): JsonValue =
- Json.parseFull(s).getOrElse(sys.error("Failed to parse json: " + s))
-
- private def assertTo[T: DecodeJson](expected: T, jsonValue: JsonObject =>
JsonValue): Unit = {
- val parsed = jsonValue(parse(json).asJsonObject)
- assertEquals(Right(expected), parsed.toEither[T])
- assertEquals(expected, parsed.to[T])
- }
-
- private def assertToFails[T: DecodeJson](jsonValue: JsonObject =>
JsonValue): Unit = {
- val parsed = jsonValue(parse(json).asJsonObject)
- assertTrue(parsed.toEither[T].isLeft)
- assertThrow[JsonMappingException](parsed.to[T])
- }
-
- def assertThrow[E <: Throwable : Manifest](body: => Unit): Unit = {
- import scala.util.control.Exception._
- val klass = manifest[E].runtimeClass
- catchingPromiscuously(klass).opt(body).foreach { _ =>
- fail("Expected `" + klass + "` to be thrown, but no exception was
thrown")
- }
- }
-
- @Test
- def testAsJsonObject(): Unit = {
- val parsed = parse(json).asJsonObject
- val obj = parsed("object")
- assertEquals(obj, obj.asJsonObject)
- assertThrow[JsonMappingException](parsed("array").asJsonObject)
- }
-
- @Test
- def testAsJsonObjectOption(): Unit = {
- val parsed = parse(json).asJsonObject
- assertTrue(parsed("object").asJsonObjectOption.isDefined)
- assertEquals(None, parsed("array").asJsonObjectOption)
- }
-
- @Test
- def testAsJsonArray(): Unit = {
- val parsed = parse(json).asJsonObject
- val array = parsed("array")
- assertEquals(array, array.asJsonArray)
- assertThrow[JsonMappingException](parsed("object").asJsonArray)
- }
-
- @Test
- def testAsJsonArrayOption(): Unit = {
- val parsed = parse(json).asJsonObject
- assertTrue(parsed("array").asJsonArrayOption.isDefined)
- assertEquals(None, parsed("object").asJsonArrayOption)
- }
-
- @Test
- def testJsonObjectGet(): Unit = {
- val parsed = parse(json).asJsonObject
- assertEquals(Some(parse("""{"a":true,"b":false}""")), parsed.get("object"))
- assertEquals(None, parsed.get("aaaaa"))
- }
-
- @Test
- def testJsonObjectApply(): Unit = {
- val parsed = parse(json).asJsonObject
- assertEquals(parse("""{"a":true,"b":false}"""), parsed("object"))
- assertThrow[JsonMappingException](parsed("aaaaaaaa"))
- }
-
- @Test
- def testJsonObjectIterator(): Unit = {
- assertEquals(
- Vector("a" -> parse("true"), "b" -> parse("false")),
- parse(json).asJsonObject("object").asJsonObject.iterator.toVector
- )
- }
-
- @Test
- def testJsonArrayIterator(): Unit = {
- assertEquals(Vector("4.0", "11.1", "44.5").map(parse),
parse(json).asJsonObject("array").asJsonArray.iterator.toVector)
- }
-
- @Test
- def testJsonValueEquals(): Unit = {
-
- assertEquals(parse(json), parse(json))
-
- assertEquals(parse("""{"blue": true, "red": false}"""), parse("""{"red":
false, "blue": true}"""))
- assertNotEquals(parse("""{"blue": true, "red": true}"""), parse("""{"red":
false, "blue": true}"""))
-
- assertEquals(parse("""[1, 2, 3]"""), parse("""[1, 2, 3]"""))
- assertNotEquals(parse("""[1, 2, 3]"""), parse("""[2, 1, 3]"""))
-
- assertEquals(parse("1344"), parse("1344"))
- assertNotEquals(parse("1344"), parse("144"))
-
- }
-
- @Test
- def testJsonValueHashCode(): Unit = {
- assertEquals(new ObjectMapper().readTree(json).hashCode,
parse(json).hashCode)
- }
-
- @Test
- def testJsonValueToString(): Unit = {
- val js =
"""{"boolean":false,"int":1234,"array":[4.0,11.1,44.5],"object":{"a":true,"b":false}}"""
- assertEquals(js, parse(js).toString)
- }
-
- @Test
- def testDecodeBoolean(): Unit = {
- assertTo[Boolean](false, _("boolean"))
- assertToFails[Boolean](_("int"))
- }
-
- @Test
- def testDecodeString(): Unit = {
- assertTo[String]("string", _("string"))
- assertTo[String]("123", _("number_as_string"))
- assertToFails[String](_("int"))
- assertToFails[String](_("array"))
- }
-
- @Test
- def testDecodeInt(): Unit = {
- assertTo[Int](1234, _("int"))
- assertToFails[Int](_("long"))
- }
-
- @Test
- def testDecodeLong(): Unit = {
- assertTo[Long](3000000000L, _("long"))
- assertTo[Long](1234, _("int"))
- assertToFails[Long](_("string"))
- }
-
- @Test
- def testDecodeDouble(): Unit = {
- assertTo[Double](16.244355, _("double"))
- assertTo[Double](1234.0, _("int"))
- assertTo[Double](3000000000L, _("long"))
- assertToFails[Double](_("string"))
- }
-
- @Test
- def testDecodeSeq(): Unit = {
- assertTo[Seq[Double]](Seq(4.0, 11.1, 44.5), _("array"))
- assertToFails[Seq[Double]](_("string"))
- assertToFails[Seq[Double]](_("object"))
- assertToFails[Seq[String]](_("array"))
- }
-
- @Test
- def testDecodeMap(): Unit = {
- assertTo[Map[String, Boolean]](Map("a" -> true, "b" -> false), _("object"))
- assertToFails[Map[String, Int]](_("object"))
- assertToFails[Map[String, String]](_("object"))
- assertToFails[Map[String, Double]](_("array"))
- }
-
- @Test
- def testDecodeOption(): Unit = {
- assertTo[Option[Int]](None, _("null"))
- assertTo[Option[Int]](Some(1234), _("int"))
- assertToFails[Option[String]](_("int"))
- }
-
-}
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 3f0f9efd165..2de3d64e61e 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -151,13 +151,6 @@ For a detailed description of spotbugs bug categories, see
https://spotbugs.read
<Bug pattern="EQ_UNUSUAL"/>
</Match>
- <Match>
- <!-- Add a suppression for auto-generated calls to instanceof in
kafka.utils.Json -->
- <Source name="Json.scala"/>
- <Package name="kafka.utils"/>
- <Bug pattern="BC_VACUOUS_INSTANCEOF"/>
- </Match>
-
<Match>
<!-- A spurious null check after inlining by the scalac optimizer
confuses spotBugs -->
<Class name="kafka.log.Log"/>
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java
index 051d4d7724f..d2fb9018df2 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/acl/AuthorizerBenchmark.java
@@ -35,7 +35,6 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.metadata.authorizer.StandardAcl;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
-import org.apache.kafka.security.authorizer.AclEntry;
import org.apache.kafka.server.authorizer.Action;
import org.openjdk.jmh.annotations.Benchmark;
@@ -113,14 +112,14 @@ public class AuthorizerBenchmark {
}
private void prepareAclCache() {
- Map<ResourcePattern, Set<AclEntry>> aclEntries = new HashMap<>();
+ Map<ResourcePattern, Set<AccessControlEntry>> aclEntries = new
HashMap<>();
for (int resourceId = 0; resourceId < resourceCount; resourceId++) {
ResourcePattern resource = new ResourcePattern(
(resourceId % 10 == 0) ? ResourceType.GROUP :
ResourceType.TOPIC,
resourceNamePrefix + resourceId,
(resourceId % 5 == 0) ? PatternType.PREFIXED :
PatternType.LITERAL);
- Set<AclEntry> entries = aclEntries.computeIfAbsent(resource, k ->
new HashSet<>());
+ Set<AccessControlEntry> entries =
aclEntries.computeIfAbsent(resource, k -> new HashSet<>());
for (int aclId = 0; aclId < aclCount; aclId++) {
// The principal in the request context we are using
@@ -129,36 +128,31 @@ public class AuthorizerBenchmark {
AccessControlEntry allowAce = new AccessControlEntry(
principalName, "*", AclOperation.READ,
AclPermissionType.ALLOW);
- entries.add(new AclEntry(allowAce));
+ entries.add(new AccessControlEntry(allowAce.principal(),
allowAce.host(), allowAce.operation(), allowAce.permissionType()));
if (shouldDeny()) {
- // dominantly deny the resource
- AccessControlEntry denyAce = new AccessControlEntry(
- principalName, "*", AclOperation.READ,
AclPermissionType.DENY);
- entries.add(new AclEntry(denyAce));
+ entries.add(new AccessControlEntry(principalName, "*",
AclOperation.READ, AclPermissionType.DENY));
}
}
}
ResourcePattern resourcePrefix = new
ResourcePattern(ResourceType.TOPIC, resourceNamePrefix,
PatternType.PREFIXED);
- Set<AclEntry> entriesPrefix =
aclEntries.computeIfAbsent(resourcePrefix, k -> new HashSet<>());
+ Set<AccessControlEntry> entriesPrefix =
aclEntries.computeIfAbsent(resourcePrefix, k -> new HashSet<>());
for (int hostId = 0; hostId < hostPreCount; hostId++) {
AccessControlEntry allowAce = new
AccessControlEntry(principal.toString(), "127.0.0." + hostId,
AclOperation.READ, AclPermissionType.ALLOW);
- entriesPrefix.add(new AclEntry(allowAce));
+ entriesPrefix.add(new AccessControlEntry(allowAce.principal(),
allowAce.host(), allowAce.operation(), allowAce.permissionType()));
if (shouldDeny()) {
- // dominantly deny the resource
- AccessControlEntry denyAce = new
AccessControlEntry(principal.toString(), "127.0.0." + hostId,
- AclOperation.READ, AclPermissionType.DENY);
- entriesPrefix.add(new AclEntry(denyAce));
+ entriesPrefix.add(new AccessControlEntry(principal.toString(),
"127.0.0." + hostId,
+ AclOperation.READ, AclPermissionType.DENY));
}
}
ResourcePattern resourceWildcard = new
ResourcePattern(ResourceType.TOPIC, ResourcePattern.WILDCARD_RESOURCE,
PatternType.LITERAL);
- Set<AclEntry> entriesWildcard =
aclEntries.computeIfAbsent(resourceWildcard, k -> new HashSet<>());
+ Set<AccessControlEntry> entriesWildcard =
aclEntries.computeIfAbsent(resourceWildcard, k -> new HashSet<>());
// get dynamic entries number for wildcard acl
for (int hostId = 0; hostId < resourceCount / 10; hostId++) {
String hostName = "127.0.0" + hostId;
@@ -170,23 +164,22 @@ public class AuthorizerBenchmark {
AccessControlEntry allowAce = new
AccessControlEntry(principal.toString(), hostName,
AclOperation.READ, AclPermissionType.ALLOW);
- entriesWildcard.add(new AclEntry(allowAce));
+ entriesWildcard.add(new AccessControlEntry(allowAce.principal(),
allowAce.host(), allowAce.operation(), allowAce.permissionType()));
if (shouldDeny()) {
- AccessControlEntry denyAce = new
AccessControlEntry(principal.toString(), hostName,
- AclOperation.READ, AclPermissionType.DENY);
- entriesWildcard.add(new AclEntry(denyAce));
+ entriesWildcard.add(new
AccessControlEntry(principal.toString(), hostName,
+ AclOperation.READ, AclPermissionType.DENY));
}
}
setupAcls(aclEntries);
}
- private void setupAcls(Map<ResourcePattern, Set<AclEntry>> aclEntries) {
- for (Map.Entry<ResourcePattern, Set<AclEntry>> entryMap :
aclEntries.entrySet()) {
+ private void setupAcls(Map<ResourcePattern, Set<AccessControlEntry>>
aclEntries) {
+ for (Map.Entry<ResourcePattern, Set<AccessControlEntry>> entryMap :
aclEntries.entrySet()) {
ResourcePattern resourcePattern = entryMap.getKey();
- for (AclEntry aclEntry : entryMap.getValue()) {
- StandardAcl standardAcl = StandardAcl.fromAclBinding(new
AclBinding(resourcePattern, aclEntry));
+ for (AccessControlEntry accessControlEntry : entryMap.getValue()) {
+ StandardAcl standardAcl = StandardAcl.fromAclBinding(new
AclBinding(resourcePattern, accessControlEntry));
authorizer.addAcl(Uuid.randomUuid(), standardAcl);
}
authorizer.completeInitialLoad();
diff --git
a/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
b/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
index ea27ee6ea41..4b1b12aefdc 100644
--- a/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
+++ b/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
@@ -16,28 +16,15 @@
*/
package org.apache.kafka.security.authorizer;
-import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclOperation;
-import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
-import org.apache.kafka.common.utils.SecurityUtils;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.server.util.Json;
-import org.apache.kafka.server.util.json.DecodeJson;
-import org.apache.kafka.server.util.json.JsonObject;
-import org.apache.kafka.server.util.json.JsonValue;
-import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -54,99 +41,16 @@ import static
org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
import static org.apache.kafka.common.acl.AclOperation.READ;
import static org.apache.kafka.common.acl.AclOperation.WRITE;
-public class AclEntry extends AccessControlEntry {
- private static final DecodeJson.DecodeInteger INT = new
DecodeJson.DecodeInteger();
- private static final DecodeJson.DecodeString STRING = new
DecodeJson.DecodeString();
+public class AclEntry {
public static final KafkaPrincipal WILDCARD_PRINCIPAL = new
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*");
public static final String WILDCARD_PRINCIPAL_STRING =
WILDCARD_PRINCIPAL.toString();
public static final String WILDCARD_HOST = "*";
public static final String WILDCARD_RESOURCE =
ResourcePattern.WILDCARD_RESOURCE;
- public static final String RESOURCE_SEPARATOR = ":";
- public static final Set<ResourceType> RESOURCE_TYPES =
Arrays.stream(ResourceType.values())
- .filter(t -> !(t == ResourceType.UNKNOWN || t == ResourceType.ANY))
- .collect(Collectors.toSet());
public static final Set<AclOperation> ACL_OPERATIONS =
Arrays.stream(AclOperation.values())
.filter(t -> !(t == AclOperation.UNKNOWN || t == AclOperation.ANY))
.collect(Collectors.toSet());
- private static final String PRINCIPAL_KEY = "principal";
- private static final String PERMISSION_TYPE_KEY = "permissionType";
- private static final String OPERATION_KEY = "operation";
- private static final String HOSTS_KEY = "host";
- public static final String VERSION_KEY = "version";
- public static final int CURRENT_VERSION = 1;
- private static final String ACLS_KEY = "acls";
-
- public final AccessControlEntry ace;
- public final KafkaPrincipal kafkaPrincipal;
-
- public AclEntry(AccessControlEntry ace) {
- super(ace.principal(), ace.host(), ace.operation(),
ace.permissionType());
- this.ace = ace;
-
- kafkaPrincipal = ace.principal() == null
- ? null
- : SecurityUtils.parseKafkaPrincipal(ace.principal());
- }
-
- /**
- * Parse JSON representation of ACLs
- * @param bytes of acls json string
- *
- * <p>
- {
- "version": 1,
- "acls": [
- {
- "host":"host1",
- "permissionType": "Deny",
- "operation": "Read",
- "principal": "User:alice"
- }
- ]
- }
- * </p>
- *
- * @return set of AclEntry objects from the JSON string
- */
- public static Set<AclEntry> fromBytes(byte[] bytes) throws IOException {
- if (bytes == null || bytes.length == 0)
- return Collections.emptySet();
-
- Optional<JsonValue> jsonValue = Json.parseBytes(bytes);
- if (!jsonValue.isPresent())
- return Collections.emptySet();
-
- JsonObject js = jsonValue.get().asJsonObject();
-
- //the acl json version.
- Utils.require(js.apply(VERSION_KEY).to(INT) == CURRENT_VERSION);
-
- Set<AclEntry> res = new HashSet<>();
-
- Iterator<JsonValue> aclsIter =
js.apply(ACLS_KEY).asJsonArray().iterator();
- while (aclsIter.hasNext()) {
- JsonObject itemJs = aclsIter.next().asJsonObject();
- KafkaPrincipal principal =
SecurityUtils.parseKafkaPrincipal(itemJs.apply(PRINCIPAL_KEY).to(STRING));
- AclPermissionType permissionType =
SecurityUtils.permissionType(itemJs.apply(PERMISSION_TYPE_KEY).to(STRING));
- String host = itemJs.apply(HOSTS_KEY).to(STRING);
- AclOperation operation =
SecurityUtils.operation(itemJs.apply(OPERATION_KEY).to(STRING));
-
- res.add(new AclEntry(new AccessControlEntry(principal.toString(),
- host, operation, permissionType)));
- }
-
- return res;
- }
-
- public static Map<String, Object> toJsonCompatibleMap(Set<AclEntry> acls) {
- Map<String, Object> res = new HashMap<>();
- res.put(AclEntry.VERSION_KEY, AclEntry.CURRENT_VERSION);
- res.put(AclEntry.ACLS_KEY,
acls.stream().map(AclEntry::toMap).collect(Collectors.toList()));
- return res;
- }
-
public static Set<AclOperation> supportedOperations(ResourceType
resourceType) {
switch (resourceType) {
case TOPIC:
@@ -182,28 +86,4 @@ public class AclEntry extends AccessControlEntry {
throw new IllegalArgumentException("Authorization error type
not known");
}
}
-
- public Map<String, Object> toMap() {
- Map<String, Object> res = new HashMap<>();
- res.put(AclEntry.PRINCIPAL_KEY, principal());
- res.put(AclEntry.PERMISSION_TYPE_KEY,
SecurityUtils.permissionTypeName(permissionType()));
- res.put(AclEntry.OPERATION_KEY,
SecurityUtils.operationName(operation()));
- res.put(AclEntry.HOSTS_KEY, host());
- return res;
- }
-
- @Override
- public int hashCode() {
- return ace.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- return super.equals(o); // to keep spotbugs happy
- }
-
- @Override
- public String toString() {
- return String.format("%s has %s permission for operations: %s from
hosts: %s", principal(), permissionType().name(), operation(), host());
- }
}