dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1113132735


##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java:
##########
@@ -59,7 +63,36 @@ public OffsetCommitRequest build(short version) {
                 throw new UnsupportedVersionException("The broker offset 
commit protocol version " +
                         version + " does not support usage of config 
group.instance.id.");
             }
-            return new OffsetCommitRequest(data, version);
+
+            // Copy since we can mutate it.
+            OffsetCommitRequestData requestData = data.duplicate();

Review Comment:
   nit: We probably don't need to duplicate `data` here. I understand why you 
are doing it but in practice we assume that `data` is owned by the builder once 
it is given to it.



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java:
##########
@@ -78,11 +111,18 @@ public OffsetCommitRequestData data() {
         return data;
     }
 
-    public Map<TopicPartition, Long> offsets() {
+    public Map<TopicPartition, Long> offsets(TopicResolver topicResolver) {

Review Comment:
   I just realized that this is only used in tests. I wonder if we should just 
get rid of it and use the auto-generated classes in tests as well.



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java:
##########
@@ -59,7 +63,36 @@ public OffsetCommitRequest build(short version) {
                 throw new UnsupportedVersionException("The broker offset 
commit protocol version " +
                         version + " does not support usage of config 
group.instance.id.");
             }
-            return new OffsetCommitRequest(data, version);
+
+            // Copy since we can mutate it.
+            OffsetCommitRequestData requestData = data.duplicate();
+
+            if (version >= 9) {
+                requestData.topics().forEach(topic -> {
+                    // Set the topic name to null if a topic ID for the topic 
is present. If no topic ID is
+                    // provided (i.e. its value is ZERO_UUID), the client 
should provide a topic name as a
+                    // fallback. This allows the OffsetCommit API to support 
both topic IDs and topic names
+                    // inside the same request or response.
+                    if (!Uuid.ZERO_UUID.equals(topic.topicId())) {
+                        topic.setName(null);
+                    } else if (topic.name() == null || 
"".equals(topic.name())) {
+                        // Fail-fast the entire request. This means that a 
single invalid topic in a multi-topic
+                        // request will make it fail. We may want to relax the 
constraint to allow the request
+                        // with valid topics (i.e. for which a valid ID or 
name was provided) exist in the request.
+                        throw new UnknownTopicOrPartitionException(

Review Comment:
   nit: InvalidRequestException would be more appropriate.



##########
clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java:
##########
@@ -139,4 +151,65 @@ public void testVersionSupportForGroupInstanceId() {
             }
         }
     }
+
+    @Test
+    public void testHandlingOfTopicIdInAllVersions() {
+        for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) {
+            OffsetCommitRequest request = new 
OffsetCommitRequest.Builder(data).build(version);
+            List<OffsetCommitRequestTopic> requestTopics = 
request.data().topics();
+
+            if (version >= 9) {
+                // Version >= 9:
+                //   Topic ID may be present or not. Both are valid cases. If 
no topic ID is provided (null or
+                //   set to ZERO_UUID), a topic name must be provided and will 
be used. If a topic ID is provided,
+                //   the name will be nullified.
+                assertNull(requestTopics.get(0).name());
+                assertEquals(topicOneId, requestTopics.get(0).topicId());
+
+                assertEquals(topicTwo, requestTopics.get(1).name());
+                assertEquals(Uuid.ZERO_UUID, requestTopics.get(1).topicId());
+
+            } else {
+                // Version < 9:
+                //   Topic ID may be present or not. They are set to ZERO_UUID 
in the finalized request. Any other
+                //   value would make serialization of the request fail.
+                assertEquals(topicOne, requestTopics.get(0).name());
+                assertEquals(Uuid.ZERO_UUID, requestTopics.get(0).topicId());
+
+                assertEquals(topicTwo, requestTopics.get(1).name());
+                assertEquals(Uuid.ZERO_UUID, requestTopics.get(1).topicId());
+            }
+        }
+    }
+
+    @Test
+    public void testTopicIdMustBeSetIfNoTopicNameIsProvided() {
+        OffsetCommitRequestTopic topic = new OffsetCommitRequestTopic()
+            .setPartitions(Collections.singletonList(requestPartitionOne));
+        OffsetCommitRequestData data = new OffsetCommitRequestData()
+            .setGroupId(groupId)
+            .setTopics(Collections.singletonList(topic));
+
+        assertThrows(UnknownTopicOrPartitionException.class, () -> new 
OffsetCommitRequest.Builder(data).build((short) 9));
+    }
+
+    @Test
+    public void testResolvesTopicNameIfRequiredWhenListingOffsets() {
+        for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) {

Review Comment:
   nit: ditto.



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -151,11 +154,12 @@ public Builder addPartition(
 
         public <P> Builder addPartitions(
             String topicName,
+            Uuid topicId,
             List<P> partitions,
             Function<P, Integer> partitionIndex,
             Errors error
         ) {
-            final OffsetCommitResponseTopic topicResponse = 
getOrCreateTopic(topicName);
+            final OffsetCommitResponseTopic topicResponse = 
getOrCreateTopic(topicName, topicId);

Review Comment:
   I think that there is a bug here for the case where multiple topic ids are 
unknown in a single request. For those, the topic name will be null so they 
will be aggregated in the same OffsetCommitResponseTopic and that one will have 
the topic id of the first unknown topic id seen.



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -192,8 +196,27 @@ public Builder merge(
             return this;
         }
 
-        public OffsetCommitResponse build() {
-            return new OffsetCommitResponse(data);
+        public OffsetCommitResponse build(short version) {
+            // Copy since we can mutate it.
+            OffsetCommitResponseData responseData = data.duplicate();
+
+            if (version >= 9) {
+                responseData.topics().forEach(topic -> {
+                    // Set the topic name to null if a topic ID for the topic 
is present.
+                    if (!Uuid.ZERO_UUID.equals(topic.topicId())) {
+                        topic.setName(null);
+                    }
+                });
+            } else {
+                responseData.topics().forEach(topic -> {
+                    // Topic must be set to default for version < 9.
+                    if (!Uuid.ZERO_UUID.equals(topic.topicId())) {
+                        topic.setTopicId(Uuid.ZERO_UUID);
+                    }
+                    // Topic name must not be null. Validity will be checked 
at serialization time.

Review Comment:
   nit: I think that we could remove this comment. It does not bring much.



##########
clients/src/main/resources/common/message/OffsetCommitResponse.json:
##########
@@ -28,15 +28,19 @@
   // Version 7 offsetCommitRequest supports a new field called groupInstanceId 
to indicate member identity across restarts.
   //
   // Version 8 is the first flexible version.
-  "validVersions": "0-8",
+  //
+  // Version 9 adds TopicId field (KIP-848).

Review Comment:
   The KIP also specifies new errors for this version. Could we mention them 
here?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -1369,7 +1387,7 @@ public void testJoinPrepareWithDisableAutoCommit() {
         try (ConsumerCoordinator coordinator = 
prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"), true)) {
             coordinator.ensureActiveGroup();
 
-            prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
+            prepareOffsetCommitRequest(new 
OffsetCommitResponseSpec().expectedOffsets(singletonMap(t1p, 100L)));

Review Comment:
   What's the reason for this change? If we refactor this, it may be better to 
directly go with the auto-generated data structures. 



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -425,35 +425,73 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
+      val topicNames =
+        if (offsetCommitRequest.version() >= 9)
+          metadataCache.topicIdsToNames()
+        else
+          Collections.emptyMap[Uuid, String]()
+
+      // For version < 9, lookup from topicNames fails and the topic name 
(which cannot be null) is returned.
+      // For version >= 9, if lookup from topicNames fails, there are two 
possibilities:
+      //
+      // a) The topic ID was left to default and the topic name should have 
been populated as a fallback instead.
+      //    If none was provided, null is returned.
+      //
+      // b) The topic ID was not default but is not present in the local topic 
IDs cache. In this case, because
+      //    clients should make exclusive use of topic name or topic ID, the 
topic name should be null. If however
+      //    the client provided a topic name, we do not want to use it, 
because any topic with the same name
+      //    present locally would then have a topic ID which does not match 
the topic ID in the request.
+      def resolveTopicName(topic: 
OffsetCommitRequestData.OffsetCommitRequestTopic): Option[String] = {
+          val resolvedFromId = topicNames.get(topic.topicId())
+          if (resolvedFromId != null)
+            Some(resolvedFromId)
+          else if (offsetCommitRequest.version() < 9 || 
Uuid.ZERO_UUID.equals(topic.topicId)) {
+            Option(topic.name())
+          } else {
+            None
+          }
+      }
+
+      offsetCommitRequest.data.topics.forEach { topic => 
resolveTopicName(topic).foreach(topic.setName _) }

Review Comment:
   I would prefer to inline `resolveTopicName` and avoid allocating an `Option` 
which does not bring much here.
   
   In the mean time, I would directly construct the list of topic names for the 
authorizer at L461. This way, we could save re-iterating over the topics and 
the `filter`. What do you think?
   
   Moreover, the KIP states that an `INVALID_REQUEST` should be return if both 
a topic id and a topic name are provided. We could also handle this here.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -1351,8 +1351,11 @@ class KafkaApisTest {
 
   @Test
   def testHandleOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = {

Review Comment:
   It would be great if we could extend the tests here. I think that we need to 
use multiple unresolvable topic ids in the same request and also check the 
different versions. I am not sure if we could extend this one or if we should 
add other ones.



##########
clients/src/main/resources/common/message/OffsetCommitRequest.json:
##########
@@ -47,8 +49,10 @@
       "about": "The time period in ms to retain the offset." },
     { "name": "Topics", "type": "[]OffsetCommitRequestTopic", "versions": "0+",
       "about": "The topics to commit offsets for.",  "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+      { "name": "Name", "type": "string", "versions": "0+", 
"nullableVersions": "9+", "entityType": "topicName",
         "about": "The topic name." },
+      { "name": "TopicId", "type": "uuid", "versions": "9+",
+        "about": "The unique topic ID" },

Review Comment:
   nit: Could we add `.` at the end?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -425,35 +425,72 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
+      val topicNames =
+        if (offsetCommitRequest.version() >= 9)
+          metadataCache.topicIdsToNames()
+        else
+          Collections.emptyMap[Uuid, String]()
+
+      // For version < 9, lookup from topicNames fails and the topic name 
(which cannot be null) is returned.
+      // For version >= 9, if lookup from topicNames fails, there are two 
possibilities:
+      //
+      // a) The topic ID was left to default and the topic name should have 
been populated as a fallback instead.
+      //    If none was provided, null is returned.
+      //
+      // b) The topic ID was not default but is not present in the local topic 
IDs cache. In this case, because
+      //    clients should make exclusive use of topic name or topic ID, the 
topic name should be null. If however
+      //    the client provided a topic name, we do not want to use it, 
because any topic with the same name
+      //    present locally would then have a topic ID which does not match 
the topic ID in the request.
+      def resolveTopicName(topic: 
OffsetCommitRequestData.OffsetCommitRequestTopic): String = {
+          val resolvedFromId = topicNames.get(topic.topicId())
+          if (resolvedFromId != null)
+            resolvedFromId
+          else if (offsetCommitRequest.version() < 9 || 
Uuid.ZERO_UUID.equals(topic.topicId)) {
+            topic.name()
+          } else {
+            null
+          }
+      }
+
       val authorizedTopics = authHelper.filterByAuthorized(
         request.context,
         READ,
         TOPIC,
-        offsetCommitRequest.data.topics.asScala
-      )(_.name)
+        offsetCommitRequest.data.topics.asScala.filter(topic => 
resolveTopicName(topic) != null)
+      )(resolveTopicName)
 
       val responseBuilder = new OffsetCommitResponse.Builder()
       val authorizedTopicsRequest = new 
mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic]()
       offsetCommitRequest.data.topics.forEach { topic =>
-        if (!authorizedTopics.contains(topic.name)) {
+        val topicName = resolveTopicName(topic)
+        if (topicName == null) {
+          // Topic name cannot be null for version < 9. From version >= 9, 
topicName is null iff it cannot
+          // be resolved from the local topic IDs cache or topic ID was left 
to default but no fallback topic
+          // name was provided.
+          
responseBuilder.addPartitions[OffsetCommitRequestData.OffsetCommitRequestPartition](

Review Comment:
   This issue is still present. Yeah, we definitely need to update the response 
builder to support this. One way would be to change the semantic of 
`addPartitions` to directly add to the response when it is called and to only 
put the topic in the HashMap when `addPartition` is used.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2604,11 +2650,95 @@ public void testCommitOffsetUnknownMemberId() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
+        prepareOffsetCommitRequest(new OffsetCommitResponseSpec()
+                .expectedOffsets(singletonMap(t1p, 100L))
+                .error(Errors.UNKNOWN_MEMBER_ID));
+
         assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(t1p,
                 new OffsetAndMetadata(100L, "metadata")), 
time.timer(Long.MAX_VALUE)));
     }
 
+    @Test
+    public void testCommitOffsetUnknownTopicId() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // Prepare five OffsetCommit requests which return a retriable 
UNKNOWN_TOPIC_ID error.
+        // Set the timeout accordingly so that commitOffsetsSync completes on 
the fifth attempt.
+        // Note that the timer (MockTime) only ticks when its sleep(long) 
method is invoked.
+        // Because the ConsumerNetworkClient does not call sleep() in its 
network-level poll(.), this
+        // timer never moves forward once the network client is invoked. If 
there is no available
+        // response to consume, its internal poll loop never completes. Hence, 
the timeout needs to be
+        // enforced in the ConsumerCoordinator, and we need to make sure there 
are enough responses
+        // queued in the MockClient to satisfy all invocations of the 
ConsumerNetworkClient#poll(.).
+        int offsetCommitCalls = 5;
+        long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls;
+
+        IntStream.range(0, offsetCommitCalls).forEach(__ ->
+                prepareOffsetCommitRequest(new OffsetCommitResponseSpec()
+                        .expectedOffsets(singletonMap(t1p, 100L))
+                        .error(Errors.UNKNOWN_TOPIC_ID)));
+
+        // UnknownTopicIdException is retriable, hence will be retried by the 
coordinator as long as
+        // the timeout allows. Note that since topic ids are not part of the 
public API of the consumer,
+        // we cannot throw an UnknownTopicId to the user. By design, a false 
boolean indicating the
+        // offset commit failed is returned.
+        assertFalse(coordinator.commitOffsetsSync(singletonMap(t1p,
+                new OffsetAndMetadata(100L, "metadata")), 
time.timer(timeoutMs)));
+    }
+
+    @Test
+    public void testRetryCommitUnknownTopicId() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        client.prepareResponse(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_TOPIC_ID)));
+        client.prepareResponse(offsetCommitResponse(singletonMap(t1p, 
Errors.NONE)));
+
+        assertTrue(coordinator.commitOffsetsSync(singletonMap(t1p,
+                new OffsetAndMetadata(100L, "metadata")), 
time.timer(Long.MAX_VALUE)));
+    }
+
+    @Test
+    public void testTopicIdsArePopulatedByTheConsumerCoordinator() {

Review Comment:
   We also need tests to check if the response is handled correctly. 



##########
clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java:
##########
@@ -139,4 +151,65 @@ public void testVersionSupportForGroupInstanceId() {
             }
         }
     }
+
+    @Test
+    public void testHandlingOfTopicIdInAllVersions() {
+        for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) {

Review Comment:
   nit: You could replace this by the following:
   ```
       @ParameterizedTest
       @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
   ```



##########
clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java:
##########
@@ -139,4 +151,65 @@ public void testVersionSupportForGroupInstanceId() {
             }
         }
     }
+
+    @Test
+    public void testHandlingOfTopicIdInAllVersions() {
+        for (short version : ApiKeys.OFFSET_COMMIT.allVersions()) {
+            OffsetCommitRequest request = new 
OffsetCommitRequest.Builder(data).build(version);
+            List<OffsetCommitRequestTopic> requestTopics = 
request.data().topics();
+
+            if (version >= 9) {
+                // Version >= 9:
+                //   Topic ID may be present or not. Both are valid cases. If 
no topic ID is provided (null or
+                //   set to ZERO_UUID), a topic name must be provided and will 
be used. If a topic ID is provided,
+                //   the name will be nullified.
+                assertNull(requestTopics.get(0).name());
+                assertEquals(topicOneId, requestTopics.get(0).topicId());
+
+                assertEquals(topicTwo, requestTopics.get(1).name());
+                assertEquals(Uuid.ZERO_UUID, requestTopics.get(1).topicId());
+

Review Comment:
   nit: We could remove this empty line.



##########
clients/src/main/resources/common/message/OffsetCommitResponse.json:
##########
@@ -28,15 +28,19 @@
   // Version 7 offsetCommitRequest supports a new field called groupInstanceId 
to indicate member identity across restarts.
   //
   // Version 8 is the first flexible version.
-  "validVersions": "0-8",
+  //
+  // Version 9 adds TopicId field (KIP-848).
+  "validVersions": "0-9",
   "flexibleVersions": "8+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", 
"ignorable": true,
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
     { "name": "Topics", "type": "[]OffsetCommitResponseTopic", "versions": 
"0+",
       "about": "The responses for each topic.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+      { "name": "Name", "type": "string", "versions": "0+", 
"nullableVersions": "9+", "entityType": "topicName",
         "about": "The topic name." },
+      { "name": "TopicId", "type": "uuid", "versions": "9+",
+        "about": "The unique topic ID" },

Review Comment:
   nit: Could we add `.` at the end?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1374,7 +1379,8 @@ public void handle(OffsetCommitResponse commitResponse, 
RequestFuture<Void> futu
                         } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) 
{
                             unauthorizedTopics.add(tp.topic());
                         } else if (error == Errors.OFFSET_METADATA_TOO_LARGE
-                                || error == Errors.INVALID_COMMIT_OFFSET_SIZE) 
{
+                                || error == Errors.INVALID_COMMIT_OFFSET_SIZE
+                                || error == Errors.UNKNOWN_TOPIC_ID) {

Review Comment:
   At L1361 in this file, we construct `TopicPartition` based on the response 
data but we don't resolve the topic id. I think that we should add the 
resolution there as well, no? We probably need to extend tests to better cover 
this as well.
   
   Regarding `UNKNOWN_TOPIC_ID`, would it make sense to place it after 
`UNKNOWN_TOPIC_OR_PARTITION` as they are quite similar?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to