Hangleton commented on code in PR #13240:
URL: https://github.com/apache/kafka/pull/13240#discussion_r1205089112


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2707,11 +2742,271 @@ public void testCommitOffsetUnknownMemberId() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
+        prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_MEMBER_ID);
         assertThrows(CommitFailedException.class, () -> 
coordinator.commitOffsetsSync(singletonMap(t1p,
                 new OffsetAndMetadata(100L, "metadata")), 
time.timer(Long.MAX_VALUE)));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testCommitOffsetUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // Prepare five OffsetCommit requests which return a retriable 
UNKNOWN_TOPIC_ID error.
+        // Set the timeout accordingly so that commitOffsetsSync completes on 
the fifth attempt.
+        // Note that the timer (MockTime) only ticks when its sleep(long) 
method is invoked.
+        // Because the ConsumerNetworkClient does not call sleep() in its 
network-level poll(.), this
+        // timer never moves forward once the network client is invoked. If 
there is no available
+        // response to consume, its internal poll loop never completes. Hence, 
the timeout needs to be
+        // enforced in the ConsumerCoordinator, and we need to make sure there 
are enough responses
+        // queued in the MockClient to satisfy all invocations of the 
ConsumerNetworkClient#poll(.).
+        int offsetCommitCalls = 5;
+        long timeoutMs = rebalanceConfig.retryBackoffMs * offsetCommitCalls;
+
+        IntStream.range(0, offsetCommitCalls).forEach(__ ->
+            prepareOffsetCommitRequest(singletonMap(ti1p, 100L), 
Errors.UNKNOWN_TOPIC_ID));
+
+        // UnknownTopicIdException is retriable, hence will be retried by the 
coordinator as long as
+        // the timeout allows. Note that since topic ids are not part of the 
public API of the consumer,
+        // we cannot throw an UnknownTopicId to the user. By design, a false 
boolean indicating the
+        // offset commit failed is returned.
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            t1p,
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertFalse(coordinator.commitOffsetsSync(offsets, 
time.timer(timeoutMs)));
+
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+                assertSame(inputOffsets, offsets);
+                assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+                assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testRetryCommitUnknownTopicId(boolean commitSync) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.UNKNOWN_TOPIC_ID)));
+        client.prepareResponse(offsetCommitResponse(singletonMap(ti1p, 
Errors.NONE)));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(
+            t1p,
+            new OffsetAndMetadata(100L, "metadata")
+        );
+
+        if (commitSync) {
+            assertTrue(coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));
+
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(offsets, (inputOffsets, exception) 
-> {
+                // Unlike the commit offset sync API, the async API does not 
retry.
+                assertSame(inputOffsets, offsets);
+                assertEquals(RetriableCommitFailedException.class, 
exception.getClass());
+                assertEquals(UnknownTopicOrPartitionException.class, 
exception.getCause().getClass());
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+    }
+
+    static Stream<Arguments> commitOffsetTestArgs() {
+        Map<TopicIdPartition, Long> byTopicIdOffsets = new HashMap<>();
+        byTopicIdOffsets.put(ti1p, 100L);
+        byTopicIdOffsets.put(ti2p, 200L);
+
+        TopicIdPartition unknownTopicIdPartition =
+            new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("topic3", 5));
+
+        Map<TopicIdPartition, Long> byTopicNameOffsets = new HashMap<>();
+        byTopicNameOffsets.put(ti1p, 100L);
+        byTopicNameOffsets.put(ti2p, 200L);
+        byTopicNameOffsets.put(unknownTopicIdPartition, 300L);
+
+        OffsetCommitRequestData byTopicIdData = new OffsetCommitRequestData()
+            .setGroupId(groupId)
+            .setGenerationId(OffsetCommitRequest.DEFAULT_GENERATION_ID)
+            .setTopics(Arrays.asList(
+                new OffsetCommitRequestTopic()
+                    .setTopicId(ti1p.topicId())
+                    .setName(topic1)
+                    .setPartitions(singletonList(new 
OffsetCommitRequestPartition()
+                        .setPartitionIndex(t1p.partition())
+                        .setCommittedOffset(100L)
+                        .setCommittedMetadata("metadata"))),
+                new OffsetCommitRequestTopic()
+                    .setTopicId(ti2p.topicId())
+                    .setName(topic2)
+                    .setPartitions(singletonList(new 
OffsetCommitRequestPartition()
+                        .setPartitionIndex(t2p.partition())
+                        .setCommittedOffset(200L)
+                        .setCommittedMetadata("metadata")))
+            ));
+
+        OffsetCommitRequestData byTopicNameData = byTopicIdData.duplicate();
+        byTopicNameData.topics().add(new OffsetCommitRequestTopic()
+            .setName(unknownTopicIdPartition.topic())
+            .setPartitions(singletonList(new OffsetCommitRequestPartition()
+                .setPartitionIndex(5)
+                .setCommittedOffset(300L)
+                .setCommittedMetadata("metadata")))
+        );
+
+        return Stream.of(
+            Arguments.of(true, byTopicIdOffsets, byTopicIdData, (short) 9),
+            Arguments.of(false, byTopicIdOffsets, byTopicIdData, (short) 9),
+            Arguments.of(true, byTopicNameOffsets, byTopicNameData, (short) 8),
+            Arguments.of(false, byTopicNameOffsets, byTopicNameData, (short) 8)
+        );
+    }
+
+    private static Map<TopicPartition, OffsetAndMetadata> 
offsetAndMetadata(Map<TopicIdPartition, Long> offsets) {
+        return offsets.entrySet().stream()
+            .map(e -> new SimpleEntry<>(e.getKey().topicPartition(), new 
OffsetAndMetadata(e.getValue(), "metadata")))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
+    @ParameterizedTest
+    @MethodSource("commitOffsetTestArgs")
+    public void testTopicIdsArePopulatedByTheConsumerCoordinator(
+            boolean commitSync,
+            Map<TopicIdPartition, Long> offsets,
+            OffsetCommitRequestData expectedRequestData,
+            short expectedRequestVersion) {
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        OffsetCommitRequestCaptor captor = new OffsetCommitRequestCaptor();
+        prepareOffsetCommitRequest(offsets, Errors.NONE, false, captor);
+
+        Map<TopicPartition, OffsetAndMetadata> input = 
offsetAndMetadata(offsets);
+
+        if (commitSync) {
+            assertTrue(coordinator.commitOffsetsSync(input, 
time.timer(Long.MAX_VALUE)));
+
+        } else {
+            AtomicBoolean callbackInvoked = new AtomicBoolean();
+            coordinator.commitOffsetsAsync(input, (inputOffsets, exception) -> 
{
+                // Notes:
+                // 1) The offsets passed to the callback are the same object 
provided to the offset commit method.
+                //    The validation on the offsets is not required but 
defensive.
+                // 2) We validate that the commit was successful, which is the 
case if the exception is null.
+                // 3) We validate this callback was invoked, which is not 
necessary but defensive.
+                assertSame(inputOffsets, input);
+                assertNull(exception);
+                callbackInvoked.set(true);
+            });
+
+            coordinator.invokeCompletedOffsetCommitCallbacks();
+            assertTrue(callbackInvoked.get());
+        }
+
+        // The consumer does not provide a guarantee on the order of 
occurrence of topics and partitions in the
+        // OffsetCommit request, since a map of offsets is provided to the 
consumer API. Here, both requests
+        // are asserted to be identical irrespective of the order in which 
topic and partitions appear in the requests.
+        assertRequestEquals(
+            new OffsetCommitRequest(expectedRequestData, 
expectedRequestVersion),
+            captor.request
+        );
+    }
+
+    @ParameterizedTest
+    @NullSource
+    @ValueSource(strings = { "", "test1" })
+    public void 
testInvalidTopicIdReturnedByBrokerWhenCommittingOffsetSync(String topicName) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new 
OffsetAndMetadata(100L, "m"));
+
+        // The following offset commit response is valid and the authorization 
failure results in failing the
+        // offset commit invocation.
+        client.prepareResponse(offsetCommitResponse(topicName, ti1p.topicId(), 
Errors.GROUP_AUTHORIZATION_FAILED));
+        assertThrows(GroupAuthorizationException.class,
+            () -> coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));
+
+        // The following offset commit response defines a topic incorrectly. 
The coordinator ignores the topic,
+        // and the group authorization failure is therefore not propagated.
+        client.prepareResponse(offsetCommitResponse(topicName, Uuid.ZERO_UUID, 
Errors.GROUP_AUTHORIZATION_FAILED));
+        assertTrue(coordinator.commitOffsetsSync(offsets, 
time.timer(Long.MAX_VALUE)));

Review Comment:
   Just to clarify, do you mean the commit offsets method should return false 
when at least 1 over n > 1 could not be committed due to topic id mismatch, or 
when n == 1 could not be committed for the same reason?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to