Hangleton commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1106229439
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -425,35 +425,72 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) CompletableFuture.completedFuture[Unit](()) } else { + val topicNames = + if (offsetCommitRequest.version() >= 9) + metadataCache.topicIdsToNames() + else + Collections.emptyMap[Uuid, String]() + + // For version < 9, lookup from topicNames fails and the topic name (which cannot be null) is returned. + // For version >= 9, if lookup from topicNames fails, there are two possibilities: + // + // a) The topic ID was left to default and the topic name should have been populated as a fallback instead. + // If none was provided, null is returned. + // + // b) The topic ID was not default but is not present in the local topic IDs cache. In this case, because + // clients should make exclusive use of topic name or topic ID, the topic name should be null. If however + // the client provided a topic name, we do not want to use it, because any topic with the same name + // present locally would then have a topic ID which does not match the topic ID in the request. + def resolveTopicName(topic: OffsetCommitRequestData.OffsetCommitRequestTopic): String = { + val resolvedFromId = topicNames.get(topic.topicId()) + if (resolvedFromId != null) + resolvedFromId + else if (offsetCommitRequest.version() < 9 || Uuid.ZERO_UUID.equals(topic.topicId)) { + topic.name() + } else { + null + } + } + val authorizedTopics = authHelper.filterByAuthorized( request.context, READ, TOPIC, - offsetCommitRequest.data.topics.asScala - )(_.name) + offsetCommitRequest.data.topics.asScala.filter(topic => resolveTopicName(topic) != null) Review Comment: I replaced this approach with a single iteration over the list of topic data, resolving and populating the topic name in place (line #455). I am concerned though because this involved mutating the request's body. But I am also concerned about the cost of creating a new ArrayBuffer, Sequence or another data structure to pre-filter. Without falling into premature optimization, what do you think about in-place mutation? I think the problem here is that we have the instantiation of the `OffsetCommitRequest` decoupled from the resolution of topic IDs. It makes sense since the former corresponds to the request deserialization while the latter corresponds to added semantics unconveyed by the request itself. In responsibility chains on server request handlers, one pattern sometimes adopted is to decorate a request with extraneous information which fall beyond the scope of ser/de. I wonder if topic id resolution could happen before passing it to the business request handler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org