dajac commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1128110858


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -199,7 +199,8 @@ public NetworkClientDelegate.UnsentRequest 
toUnsentRequest() {
                             .setGenerationId(generation.generationId)
                             .setMemberId(generation.memberId)
                             .setGroupInstanceId(groupInstanceId)
-                            .setTopics(new 
ArrayList<>(requestTopicDataMap.values())));
+                            .setTopics(new 
ArrayList<>(requestTopicDataMap.values())),
+                    false /* Support of topic ids will be added with 
KAFKA-14777 */);

Review Comment:
   nit: We usually don't leave such comment in our code base.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1352,8 +1370,22 @@ public void handle(OffsetCommitResponse commitResponse, 
RequestFuture<Void> futu
             Set<String> unauthorizedTopics = new HashSet<>();
 
             for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
+                String topicName = topic.name();
+
+                if (commitResponse.version() >= 9) {
+                    topicName = 
topicResolver.getTopicName(topic.topicId()).orElse(null);
+
+                    if (topicName == null) {
+                        // OffsetCommit responses version 9 must use topic 
IDs. The topic's ID must have been
+                        // known by the client which sent the 
OffsetCommitRequest but was removed from the metadata
+                        // before the response was received.

Review Comment:
   Is this really true? As we keep the `TopicResolver` used to construct the 
request, all topics should be there. This case could happen if the server 
returns an unexpected topic id that was not in the request and that is not in 
the `TopicResolver`. Do I get this right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1323,27 +1335,33 @@ RequestFuture<Void> sendOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndM
             groupInstanceId = null;
         }
 
+        boolean canUseTopicIds = topicPartitionsWithoutTopicId == 0;
+
         OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
                 new OffsetCommitRequestData()
                         .setGroupId(this.rebalanceConfig.groupId)
                         .setGenerationId(generation.generationId)
                         .setMemberId(generation.memberId)
                         .setGroupInstanceId(groupInstanceId)
-                        .setTopics(new 
ArrayList<>(requestTopicDataMap.values()))
+                        .setTopics(new 
ArrayList<>(requestTopicDataMap.values())),
+                canUseTopicIds
         );
 
         log.trace("Sending OffsetCommit request with {} to coordinator {}", 
offsets, coordinator);
 
         return client.send(coordinator, builder)
-                .compose(new OffsetCommitResponseHandler(offsets, generation));
+                .compose(new OffsetCommitResponseHandler(offsets, generation, 
topicResolver));
     }
 
     private class OffsetCommitResponseHandler extends 
CoordinatorResponseHandler<OffsetCommitResponse, Void> {
         private final Map<TopicPartition, OffsetAndMetadata> offsets;
+        private final TopicResolver topicResolver;
 
-        private OffsetCommitResponseHandler(Map<TopicPartition, 
OffsetAndMetadata> offsets, Generation generation) {
+        private OffsetCommitResponseHandler(
+                Map<TopicPartition, OffsetAndMetadata> offsets, Generation 
generation, TopicResolver topicResolver) {

Review Comment:
   nit: We usually don't break long lines like this. I personally prefer the 
following:
   
   ```
   private OffsetCommitResponseHandler(
        Map<TopicPartition, OffsetAndMetadata> offsets,
        Generation generation,
        TopicResolver topicResolver
   ) {
   ```
   
   You can find other ways in the code base.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1272,6 +1275,9 @@ RequestFuture<Void> sendOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndM
         if (coordinator == null)
             return RequestFuture.coordinatorNotAvailable();
 
+        TopicResolver topicResolver = metadata.topicResolver();
+        int topicPartitionsWithoutTopicId = 0;

Review Comment:
   nit: Should we use a boolean?



##########
clients/src/main/java/org/apache/kafka/common/TopicResolver.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.errors.InvalidTopicException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Encapsulates the resolution of a topic name from its ID, or its ID from its 
name, using a local
+ * bidirectional mapping. This resolver assumes there is a bijection between 
topic IDs and topic names.
+ * <p></p>
+ * Note that this class intends to be used for the (reverse) lookup of topic 
IDs/names, but
+ * not to characterize the set of topics which are known by a client. Use the
+ * {@link org.apache.kafka.clients.MetadataCache} for that purpose.
+ */
+//@Immutable, @ThreadSafe
+public class TopicResolver {

Review Comment:
   I am not really happy with this name but I could not find a better one yet. 
My concern is that this class is really about resolving topic ids/names and not 
really topics per say. Have you considered any alternatives?



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java:
##########
@@ -49,7 +46,14 @@ public static class Builder extends 
AbstractRequest.Builder<OffsetCommitRequest>
         private final OffsetCommitRequestData data;
 
         public Builder(OffsetCommitRequestData data) {

Review Comment:
   Is this constructor still used?



##########
clients/src/main/java/org/apache/kafka/common/TopicResolver.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.errors.InvalidTopicException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Encapsulates the resolution of a topic name from its ID, or its ID from its 
name, using a local
+ * bidirectional mapping. This resolver assumes there is a bijection between 
topic IDs and topic names.
+ * <p></p>
+ * Note that this class intends to be used for the (reverse) lookup of topic 
IDs/names, but
+ * not to characterize the set of topics which are known by a client. Use the
+ * {@link org.apache.kafka.clients.MetadataCache} for that purpose.
+ */
+//@Immutable, @ThreadSafe
+public class TopicResolver {
+    private final Map<String, Uuid> topicIds;
+    private final Map<Uuid, String> topicNames;
+
+    /**
+     * A resolver which universe of topic ids and names is captured from the 
input map. The reverse association
+     * between a topic ID and a topic name is computed by this method. If 
there are more than one topic name
+     * resolving to the same topic ID, an {@link InvalidTopicException} is 
thrown.
+     */
+    public static TopicResolver fromTopicIds(Map<String, Uuid> topicIds) {
+        Map<Uuid, String> topicNames = new HashMap<>(topicIds.size());
+
+        for (Map.Entry<String, Uuid> e: topicIds.entrySet()) {
+            String conflicting = topicNames.putIfAbsent(e.getValue(), 
e.getKey());
+            if (conflicting != null) {
+                throw new InvalidTopicException(

Review Comment:
   Should this be an InvalidStateException?



##########
clients/src/test/java/org/apache/kafka/test/MoreAssertions.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
+import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static java.util.function.Function.identity;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public final class MoreAssertions {

Review Comment:
   This does not look good. It would be better to place those helpers in 
`OffsetCommitRequestTest` for instance or to keep them where they are used.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -425,35 +426,59 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestHelper.sendMaybeThrottle(request, 
offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
+      val topicResolver = metadataCache.topicResolver()
+      val responseBuilder = new OffsetCommitResponse.Builder(topicResolver, 
offsetCommitRequest.version())
+
+      val resolvedTopics =
+        if (offsetCommitRequest.version() < 9)
+          offsetCommitRequest.data.topics().asScala

Review Comment:
   nit: You can omit the `()` after `topics` as we usually don't put them for 
getters in Scala. There are a few other cases in the PR.



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -167,8 +213,24 @@ public <P> Builder addPartitions(
         }
 
         public Builder merge(
-            OffsetCommitResponseData newData
+            OffsetCommitResponseData newData,
+            Logger logger
         ) {
+            if (version >= 9) {
+                // This method is called after the group coordinator committed 
the offsets. The group coordinator
+                // provides the OffsetCommitResponseData it built in the 
process. As of now, this data does
+                // not contain topic ids, so we resolve them here.
+                newData.topics().forEach(topic -> {
+                    Uuid topicId = 
topicResolver.getTopicId(topic.name()).orElse(Uuid.ZERO_UUID);
+                    if (Uuid.ZERO_UUID.equals(topicId)) {

Review Comment:
   Should we just throw an illegale state exception if we end up having a topic 
without id? Ignoring it seems to be risky.



##########
core/src/main/scala/kafka/server/MetadataCache.scala:
##########
@@ -90,6 +90,10 @@ trait MetadataCache {
 
   def topicIdsToNames(): util.Map[Uuid, String]
 
+  def topicResolver(): TopicResolver = {
+    TopicResolver.wrap(topicNamesToIds(), topicIdsToNames())

Review Comment:
   I think that there is a race condition here. You have no guarantee that both 
maps are consistent with each others.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1281,10 +1287,16 @@ RequestFuture<Void> sendOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndM
                 return RequestFuture.failure(new 
IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset()));
             }
 
+            Uuid topicId = 
topicResolver.getTopicId(topicPartition.topic()).orElse(ZERO_UUID);
+            if (topicId.equals(ZERO_UUID)) {

Review Comment:
   nit: I usually prefer to use `ZERO_UUID.equals(...` as it is safe for null 
values.



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java:
##########
@@ -49,7 +46,14 @@ public static class Builder extends 
AbstractRequest.Builder<OffsetCommitRequest>
         private final OffsetCommitRequestData data;
 
         public Builder(OffsetCommitRequestData data) {
-            super(ApiKeys.OFFSET_COMMIT);
+            this(data, true);
+        }
+
+        public Builder(OffsetCommitRequestData data, boolean canUseTopicIds) {
+            // Version 8 is the maximum version that can be used without topic 
IDs.
+            super(ApiKeys.OFFSET_COMMIT,
+                ApiKeys.OFFSET_COMMIT.oldestVersion(),
+                canUseTopicIds ? ApiKeys.OFFSET_COMMIT.latestVersion() : 
(short) 8);

Review Comment:
   nit: When we break the line like this, we usually align the arguments on the 
first one. Otherwise, you can use the style that I mentioned earlier.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1379,7 +1411,8 @@ public void handle(OffsetCommitResponse commitResponse, 
RequestFuture<Void> futu
                             future.raise(error);
                             return;
                         } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS
-                                || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) 
{
+                                || error == Errors.UNKNOWN_TOPIC_OR_PARTITION
+                                || error == Errors.UNKNOWN_TOPIC_ID) {

Review Comment:
   For my understanding, are we going to propagate this error back to the end 
user?



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) {
         return version >= 4;
     }
 
+    public short version() {
+        return version;
+    }
+
     public static class Builder {
         OffsetCommitResponseData data = new OffsetCommitResponseData();
         HashMap<String, OffsetCommitResponseTopic> byTopicName = new 
HashMap<>();
+        private final TopicResolver topicResolver;
+        private final short version;
+
+        public Builder(TopicResolver topicResolver, short version) {

Review Comment:
   I am not sure about passing the `TopicResolver` here. My understanding is 
that we are doing this because topic ids are lost when we call the group 
coordinator. Wouldn't it better to update the group coordinator to preserve 
those topic ids? We may be able to handle this in the GroupCoordinatorAdaptor 
or we could switch to using TopicIdPartitions. We could also consider doing 
this in a separate PR as this one is already quite large.



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -50,14 +52,21 @@
 public class OffsetCommitResponse extends AbstractResponse {
 
     private final OffsetCommitResponseData data;
+    private final short version;

Review Comment:
   Did you check how we did this for the FetchRequest?



##########
clients/src/main/java/org/apache/kafka/common/TopicResolver.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.errors.InvalidTopicException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Encapsulates the resolution of a topic name from its ID, or its ID from its 
name, using a local
+ * bidirectional mapping. This resolver assumes there is a bijection between 
topic IDs and topic names.
+ * <p></p>
+ * Note that this class intends to be used for the (reverse) lookup of topic 
IDs/names, but
+ * not to characterize the set of topics which are known by a client. Use the
+ * {@link org.apache.kafka.clients.MetadataCache} for that purpose.
+ */
+//@Immutable, @ThreadSafe

Review Comment:
   We don't really use those in our code base at the moment. We usually just 
mention those characteristics in the java doc.



##########
clients/src/main/java/org/apache/kafka/common/TopicResolver.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import org.apache.kafka.common.errors.InvalidTopicException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Encapsulates the resolution of a topic name from its ID, or its ID from its 
name, using a local
+ * bidirectional mapping. This resolver assumes there is a bijection between 
topic IDs and topic names.
+ * <p></p>
+ * Note that this class intends to be used for the (reverse) lookup of topic 
IDs/names, but
+ * not to characterize the set of topics which are known by a client. Use the
+ * {@link org.apache.kafka.clients.MetadataCache} for that purpose.
+ */
+//@Immutable, @ThreadSafe
+public class TopicResolver {
+    private final Map<String, Uuid> topicIds;
+    private final Map<Uuid, String> topicNames;
+
+    /**
+     * A resolver which universe of topic ids and names is captured from the 
input map. The reverse association
+     * between a topic ID and a topic name is computed by this method. If 
there are more than one topic name
+     * resolving to the same topic ID, an {@link InvalidTopicException} is 
thrown.
+     */
+    public static TopicResolver fromTopicIds(Map<String, Uuid> topicIds) {
+        Map<Uuid, String> topicNames = new HashMap<>(topicIds.size());
+
+        for (Map.Entry<String, Uuid> e: topicIds.entrySet()) {
+            String conflicting = topicNames.putIfAbsent(e.getValue(), 
e.getKey());
+            if (conflicting != null) {
+                throw new InvalidTopicException(
+                        "Topic " + e.getKey() + " shares the same ID " + 
e.getValue() + " as topic " + conflicting);
+            }
+        }
+
+        return new TopicResolver(topicIds, topicNames);
+    }
+
+    /**
+     * A resolver which acts as a wrapper around the input mapping of topic 
ids from/to topic names.
+     * No validation is performed about the consistency of the mapping. This 
method is to be preferred
+     * when the copy of the input maps needs to be avoided.
+     */
+    public static TopicResolver wrap(Map<String, Uuid> topicIds, Map<Uuid, 
String> topicNames) {
+        return new TopicResolver(topicIds, topicNames);
+    }
+
+    /**
+     * A resolver with no existing mapping between any topic name and id.
+     */
+    public static TopicResolver emptyResolver() {
+        return fromTopicIds(Collections.emptyMap());
+    }
+
+    private TopicResolver(Map<String, Uuid> topicIds, Map<Uuid, String> 
topicNames) {
+        this.topicIds = Collections.unmodifiableMap(topicIds);
+        this.topicNames = Collections.unmodifiableMap(topicNames);
+    }
+
+    /**
+     * Returns the ID of the topic with the given name, if that association 
exists.
+     */
+    public Optional<Uuid> getTopicId(String name) {

Review Comment:
   I wonder if using Optional is necessary here given that we always use 
`orNull` and `orDefault`. What do you think?



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -35,6 +39,7 @@
  * Possible error codes:
  *
  *   - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
+ *   - {@link Errors#UNKNOWN_TOPIC_ID}

Review Comment:
   nit: Should we also add the other ones?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to