dajac commented on code in PR #13240: URL: https://github.com/apache/kafka/pull/13240#discussion_r1128110858
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -199,7 +199,8 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { .setGenerationId(generation.generationId) .setMemberId(generation.memberId) .setGroupInstanceId(groupInstanceId) - .setTopics(new ArrayList<>(requestTopicDataMap.values()))); + .setTopics(new ArrayList<>(requestTopicDataMap.values())), + false /* Support of topic ids will be added with KAFKA-14777 */); Review Comment: nit: We usually don't leave such comment in our code base. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -1352,8 +1370,22 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> futu Set<String> unauthorizedTopics = new HashSet<>(); for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) { + String topicName = topic.name(); + + if (commitResponse.version() >= 9) { + topicName = topicResolver.getTopicName(topic.topicId()).orElse(null); + + if (topicName == null) { + // OffsetCommit responses version 9 must use topic IDs. The topic's ID must have been + // known by the client which sent the OffsetCommitRequest but was removed from the metadata + // before the response was received. Review Comment: Is this really true? As we keep the `TopicResolver` used to construct the request, all topics should be there. This case could happen if the server returns an unexpected topic id that was not in the request and that is not in the `TopicResolver`. Do I get this right? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -1323,27 +1335,33 @@ RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndM groupInstanceId = null; } + boolean canUseTopicIds = topicPartitionsWithoutTopicId == 0; + OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder( new OffsetCommitRequestData() .setGroupId(this.rebalanceConfig.groupId) .setGenerationId(generation.generationId) .setMemberId(generation.memberId) .setGroupInstanceId(groupInstanceId) - .setTopics(new ArrayList<>(requestTopicDataMap.values())) + .setTopics(new ArrayList<>(requestTopicDataMap.values())), + canUseTopicIds ); log.trace("Sending OffsetCommit request with {} to coordinator {}", offsets, coordinator); return client.send(coordinator, builder) - .compose(new OffsetCommitResponseHandler(offsets, generation)); + .compose(new OffsetCommitResponseHandler(offsets, generation, topicResolver)); } private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> { private final Map<TopicPartition, OffsetAndMetadata> offsets; + private final TopicResolver topicResolver; - private OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets, Generation generation) { + private OffsetCommitResponseHandler( + Map<TopicPartition, OffsetAndMetadata> offsets, Generation generation, TopicResolver topicResolver) { Review Comment: nit: We usually don't break long lines like this. I personally prefer the following: ``` private OffsetCommitResponseHandler( Map<TopicPartition, OffsetAndMetadata> offsets, Generation generation, TopicResolver topicResolver ) { ``` You can find other ways in the code base. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -1272,6 +1275,9 @@ RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndM if (coordinator == null) return RequestFuture.coordinatorNotAvailable(); + TopicResolver topicResolver = metadata.topicResolver(); + int topicPartitionsWithoutTopicId = 0; Review Comment: nit: Should we use a boolean? ########## clients/src/main/java/org/apache/kafka/common/TopicResolver.java: ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import org.apache.kafka.common.errors.InvalidTopicException; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Encapsulates the resolution of a topic name from its ID, or its ID from its name, using a local + * bidirectional mapping. This resolver assumes there is a bijection between topic IDs and topic names. + * <p></p> + * Note that this class intends to be used for the (reverse) lookup of topic IDs/names, but + * not to characterize the set of topics which are known by a client. Use the + * {@link org.apache.kafka.clients.MetadataCache} for that purpose. + */ +//@Immutable, @ThreadSafe +public class TopicResolver { Review Comment: I am not really happy with this name but I could not find a better one yet. My concern is that this class is really about resolving topic ids/names and not really topics per say. Have you considered any alternatives? ########## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java: ########## @@ -49,7 +46,14 @@ public static class Builder extends AbstractRequest.Builder<OffsetCommitRequest> private final OffsetCommitRequestData data; public Builder(OffsetCommitRequestData data) { Review Comment: Is this constructor still used? ########## clients/src/main/java/org/apache/kafka/common/TopicResolver.java: ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import org.apache.kafka.common.errors.InvalidTopicException; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Encapsulates the resolution of a topic name from its ID, or its ID from its name, using a local + * bidirectional mapping. This resolver assumes there is a bijection between topic IDs and topic names. + * <p></p> + * Note that this class intends to be used for the (reverse) lookup of topic IDs/names, but + * not to characterize the set of topics which are known by a client. Use the + * {@link org.apache.kafka.clients.MetadataCache} for that purpose. + */ +//@Immutable, @ThreadSafe +public class TopicResolver { + private final Map<String, Uuid> topicIds; + private final Map<Uuid, String> topicNames; + + /** + * A resolver which universe of topic ids and names is captured from the input map. The reverse association + * between a topic ID and a topic name is computed by this method. If there are more than one topic name + * resolving to the same topic ID, an {@link InvalidTopicException} is thrown. + */ + public static TopicResolver fromTopicIds(Map<String, Uuid> topicIds) { + Map<Uuid, String> topicNames = new HashMap<>(topicIds.size()); + + for (Map.Entry<String, Uuid> e: topicIds.entrySet()) { + String conflicting = topicNames.putIfAbsent(e.getValue(), e.getKey()); + if (conflicting != null) { + throw new InvalidTopicException( Review Comment: Should this be an InvalidStateException? ########## clients/src/test/java/org/apache/kafka/test/MoreAssertions.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic; +import org.apache.kafka.common.requests.OffsetCommitRequest; + +import java.util.Map; +import java.util.stream.Collectors; + +import static java.util.function.Function.identity; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public final class MoreAssertions { Review Comment: This does not look good. It would be better to place those helpers in `OffsetCommitRequestTest` for instance or to keep them where they are used. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -425,35 +426,59 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendMaybeThrottle(request, offsetCommitRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) CompletableFuture.completedFuture[Unit](()) } else { + val topicResolver = metadataCache.topicResolver() + val responseBuilder = new OffsetCommitResponse.Builder(topicResolver, offsetCommitRequest.version()) + + val resolvedTopics = + if (offsetCommitRequest.version() < 9) + offsetCommitRequest.data.topics().asScala Review Comment: nit: You can omit the `()` after `topics` as we usually don't put them for getters in Scala. There are a few other cases in the PR. ########## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ########## @@ -167,8 +213,24 @@ public <P> Builder addPartitions( } public Builder merge( - OffsetCommitResponseData newData + OffsetCommitResponseData newData, + Logger logger ) { + if (version >= 9) { + // This method is called after the group coordinator committed the offsets. The group coordinator + // provides the OffsetCommitResponseData it built in the process. As of now, this data does + // not contain topic ids, so we resolve them here. + newData.topics().forEach(topic -> { + Uuid topicId = topicResolver.getTopicId(topic.name()).orElse(Uuid.ZERO_UUID); + if (Uuid.ZERO_UUID.equals(topicId)) { Review Comment: Should we just throw an illegale state exception if we end up having a topic without id? Ignoring it seems to be risky. ########## core/src/main/scala/kafka/server/MetadataCache.scala: ########## @@ -90,6 +90,10 @@ trait MetadataCache { def topicIdsToNames(): util.Map[Uuid, String] + def topicResolver(): TopicResolver = { + TopicResolver.wrap(topicNamesToIds(), topicIdsToNames()) Review Comment: I think that there is a race condition here. You have no guarantee that both maps are consistent with each others. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -1281,10 +1287,16 @@ RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndM return RequestFuture.failure(new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset())); } + Uuid topicId = topicResolver.getTopicId(topicPartition.topic()).orElse(ZERO_UUID); + if (topicId.equals(ZERO_UUID)) { Review Comment: nit: I usually prefer to use `ZERO_UUID.equals(...` as it is safe for null values. ########## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java: ########## @@ -49,7 +46,14 @@ public static class Builder extends AbstractRequest.Builder<OffsetCommitRequest> private final OffsetCommitRequestData data; public Builder(OffsetCommitRequestData data) { - super(ApiKeys.OFFSET_COMMIT); + this(data, true); + } + + public Builder(OffsetCommitRequestData data, boolean canUseTopicIds) { + // Version 8 is the maximum version that can be used without topic IDs. + super(ApiKeys.OFFSET_COMMIT, + ApiKeys.OFFSET_COMMIT.oldestVersion(), + canUseTopicIds ? ApiKeys.OFFSET_COMMIT.latestVersion() : (short) 8); Review Comment: nit: When we break the line like this, we usually align the arguments on the first one. Otherwise, you can use the style that I mentioned earlier. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -1379,7 +1411,8 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> futu future.raise(error); return; } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS - || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { + || error == Errors.UNKNOWN_TOPIC_OR_PARTITION + || error == Errors.UNKNOWN_TOPIC_ID) { Review Comment: For my understanding, are we going to propagate this error back to the end user? ########## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ########## @@ -119,28 +131,52 @@ public boolean shouldClientThrottle(short version) { return version >= 4; } + public short version() { + return version; + } + public static class Builder { OffsetCommitResponseData data = new OffsetCommitResponseData(); HashMap<String, OffsetCommitResponseTopic> byTopicName = new HashMap<>(); + private final TopicResolver topicResolver; + private final short version; + + public Builder(TopicResolver topicResolver, short version) { Review Comment: I am not sure about passing the `TopicResolver` here. My understanding is that we are doing this because topic ids are lost when we call the group coordinator. Wouldn't it better to update the group coordinator to preserve those topic ids? We may be able to handle this in the GroupCoordinatorAdaptor or we could switch to using TopicIdPartitions. We could also consider doing this in a separate PR as this one is already quite large. ########## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ########## @@ -50,14 +52,21 @@ public class OffsetCommitResponse extends AbstractResponse { private final OffsetCommitResponseData data; + private final short version; Review Comment: Did you check how we did this for the FetchRequest? ########## clients/src/main/java/org/apache/kafka/common/TopicResolver.java: ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import org.apache.kafka.common.errors.InvalidTopicException; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Encapsulates the resolution of a topic name from its ID, or its ID from its name, using a local + * bidirectional mapping. This resolver assumes there is a bijection between topic IDs and topic names. + * <p></p> + * Note that this class intends to be used for the (reverse) lookup of topic IDs/names, but + * not to characterize the set of topics which are known by a client. Use the + * {@link org.apache.kafka.clients.MetadataCache} for that purpose. + */ +//@Immutable, @ThreadSafe Review Comment: We don't really use those in our code base at the moment. We usually just mention those characteristics in the java doc. ########## clients/src/main/java/org/apache/kafka/common/TopicResolver.java: ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import org.apache.kafka.common.errors.InvalidTopicException; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Encapsulates the resolution of a topic name from its ID, or its ID from its name, using a local + * bidirectional mapping. This resolver assumes there is a bijection between topic IDs and topic names. + * <p></p> + * Note that this class intends to be used for the (reverse) lookup of topic IDs/names, but + * not to characterize the set of topics which are known by a client. Use the + * {@link org.apache.kafka.clients.MetadataCache} for that purpose. + */ +//@Immutable, @ThreadSafe +public class TopicResolver { + private final Map<String, Uuid> topicIds; + private final Map<Uuid, String> topicNames; + + /** + * A resolver which universe of topic ids and names is captured from the input map. The reverse association + * between a topic ID and a topic name is computed by this method. If there are more than one topic name + * resolving to the same topic ID, an {@link InvalidTopicException} is thrown. + */ + public static TopicResolver fromTopicIds(Map<String, Uuid> topicIds) { + Map<Uuid, String> topicNames = new HashMap<>(topicIds.size()); + + for (Map.Entry<String, Uuid> e: topicIds.entrySet()) { + String conflicting = topicNames.putIfAbsent(e.getValue(), e.getKey()); + if (conflicting != null) { + throw new InvalidTopicException( + "Topic " + e.getKey() + " shares the same ID " + e.getValue() + " as topic " + conflicting); + } + } + + return new TopicResolver(topicIds, topicNames); + } + + /** + * A resolver which acts as a wrapper around the input mapping of topic ids from/to topic names. + * No validation is performed about the consistency of the mapping. This method is to be preferred + * when the copy of the input maps needs to be avoided. + */ + public static TopicResolver wrap(Map<String, Uuid> topicIds, Map<Uuid, String> topicNames) { + return new TopicResolver(topicIds, topicNames); + } + + /** + * A resolver with no existing mapping between any topic name and id. + */ + public static TopicResolver emptyResolver() { + return fromTopicIds(Collections.emptyMap()); + } + + private TopicResolver(Map<String, Uuid> topicIds, Map<Uuid, String> topicNames) { + this.topicIds = Collections.unmodifiableMap(topicIds); + this.topicNames = Collections.unmodifiableMap(topicNames); + } + + /** + * Returns the ID of the topic with the given name, if that association exists. + */ + public Optional<Uuid> getTopicId(String name) { Review Comment: I wonder if using Optional is necessary here given that we always use `orNull` and `orDefault`. What do you think? ########## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ########## @@ -35,6 +39,7 @@ * Possible error codes: * * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} + * - {@link Errors#UNKNOWN_TOPIC_ID} Review Comment: nit: Should we also add the other ones? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org