dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1105854122


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -425,35 +425,72 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
+      val topicNames =
+        if (offsetCommitRequest.version() >= 9)
+          metadataCache.topicIdsToNames()
+        else
+          Collections.emptyMap[Uuid, String]()
+
+      // For version < 9, lookup from topicNames fails and the topic name 
(which cannot be null) is returned.
+      // For version >= 9, if lookup from topicNames fails, there are two 
possibilities:
+      //
+      // a) The topic ID was left to default and the topic name should have 
been populated as a fallback instead.
+      //    If none was provided, null is returned.
+      //
+      // b) The topic ID was not default but is not present in the local topic 
IDs cache. In this case, because
+      //    clients should make exclusive use of topic name or topic ID, the 
topic name should be null. If however
+      //    the client provided a topic name, we do not want to use it, 
because any topic with the same name
+      //    present locally would then have a topic ID which does not match 
the topic ID in the request.
+      def resolveTopicName(topic: 
OffsetCommitRequestData.OffsetCommitRequestTopic): String = {
+          val resolvedFromId = topicNames.get(topic.topicId())
+          if (resolvedFromId != null)
+            resolvedFromId
+          else if (offsetCommitRequest.version() < 9 || 
Uuid.ZERO_UUID.equals(topic.topicId)) {
+            topic.name()
+          } else {
+            null
+          }
+      }
+
       val authorizedTopics = authHelper.filterByAuthorized(
         request.context,
         READ,
         TOPIC,
-        offsetCommitRequest.data.topics.asScala
-      )(_.name)
+        offsetCommitRequest.data.topics.asScala.filter(topic => 
resolveTopicName(topic) != null)

Review Comment:
   We are calling `resolveTopicName` three times. I think that it would be 
better to iterate once over the topics to resolve the topic ids and build the 
list of topic names (if one was found) while doing this. Then, we can check the 
authorization and do the rest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to