This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push: new 0257f26 KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946) 0257f26 is described below commit 0257f266063c3bffaa690fe3e98a3c876be99002 Author: Randall Hauch <rha...@gmail.com> AuthorDate: Thu Mar 24 21:40:10 2022 -0500 KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946) The `retryEndOffsets(…)` method in `TopicAdmin` recently added (KAFKA-12879, #11797) to allow the `KafkaBasedLog.start()` method to retry any failures reading the last offsets for a topic. However, this introduce a regression when talking to older brokers (0.10.x or earlier). The `KafkaBasedLog` already had logic that expected an `UnsupportedVersionException` thrown by the admin client when a Kafka API is not available on an older broker, but the new retry logic in `TopicAdmin` did not account for this and wrapped the exception, thereby breaking the `KafkaBasedLog` logic and preventing startup. The fix is to propagate this `UnsupportedVersionException` from the `TopicAdmin.retryEndOffsets(…)` method. Added a new unit test that first replicated the problem before the fix, and verified the fix corrects the problem. --- .../org/apache/kafka/connect/util/TopicAdmin.java | 4 ++++ .../apache/kafka/connect/util/TopicAdminTest.java | 28 +++++++++++++++++++++- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index dfbca5b..fca066f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -714,6 +714,7 @@ public class TopicAdmin implements AutoCloseable { * must be 0 or more * @return the map of offset for each topic partition, or an empty map if the supplied partitions * are null or empty + * @throws UnsupportedVersionException if the broker is too old to support the admin client API to read end offsets * @throws ConnectException if {@code timeoutDuration} is exhausted * @see TopicAdmin#endOffsets(Set) */ @@ -725,6 +726,9 @@ public class TopicAdmin implements AutoCloseable { () -> "list offsets for topic partitions", timeoutDuration, retryBackoffMs); + } catch (UnsupportedVersionException e) { + // Older brokers don't support this admin method, so rethrow it without wrapping it + throw e; } catch (Exception e) { throw new ConnectException("Failed to list offsets for topic partitions.", e); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index dd855e1..8bbad23 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -503,11 +503,33 @@ public class TopicAdminTest { KafkaFuture<ListOffsetsResultInfo> future = mockFuture(); when(future.get()).thenReturn(resultsInfo); when(results.partitionResult(eq(tp))).thenReturn(future); + } + /** + * TopicAdmin can be used to read the end offsets, but the admin client API used to do this was + * added to the broker in 0.11.0.0. This means that if Connect talks to older brokers, + * the admin client cannot be used to read end offsets, and will throw an UnsupportedVersionException. + */ + @Test + public void retryEndOffsetsShouldRethrowUnknownVersionException() { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set<TopicPartition> tps = Collections.singleton(tp1); + Long offset = 1000L; + Cluster cluster = createCluster(1, topicName, 1); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + // Expect the admin client list offsets will throw unsupported version, simulating older brokers + env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset)); + TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + // The retryEndOffsets should catch and rethrow an unsupported version exception + assertThrows(UnsupportedVersionException.class, () -> admin.retryEndOffsets(tps, Duration.ofMillis(100), 1)); + } } @Test - public void retryEndOffsetsShouldThrowConnectException() { + public void retryEndOffsetsShouldWrapNonRetriableExceptionsWithConnectException() { String topicName = "myTopic"; TopicPartition tp1 = new TopicPartition(topicName, 0); Set<TopicPartition> tps = Collections.singleton(tp1); @@ -756,6 +778,10 @@ public class TopicAdminTest { MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED); } + private ListOffsetResponse listOffsetsResultWithUnsupportedVersion(TopicPartition tp1, Long offset1) { + return listOffsetResponse(tp1, Errors.UNSUPPORTED_VERSION, 1L, offset1, null); + } + private ListOffsetResponse listOffsetResponse(TopicPartition tp, long offset) { return listOffsetResponse(tp, Errors.NONE, 1L, offset, null); }