This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 0189aa6 KAFKA-13770: Restore compatibility with KafkaBasedLog using
older Kafka brokers (#11946)
0189aa6 is described below
commit 0189aa62882a2bd60c2beb1507176864ad253abb
Author: Randall Hauch <[email protected]>
AuthorDate: Thu Mar 24 21:40:10 2022 -0500
KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka
brokers (#11946)
The `retryEndOffsets(…)` method in `TopicAdmin` recently added
(KAFKA-12879, #11797) to allow the `KafkaBasedLog.start()` method to retry any
failures reading the last offsets for a topic. However, this introduce a
regression when talking to older brokers (0.10.x or earlier).
The `KafkaBasedLog` already had logic that expected an
`UnsupportedVersionException` thrown by the admin client when a Kafka API is
not available on an older broker, but the new retry logic in `TopicAdmin` did
not account for this and wrapped the exception, thereby breaking the
`KafkaBasedLog` logic and preventing startup.
The fix is to propagate this `UnsupportedVersionException` from the
`TopicAdmin.retryEndOffsets(…)` method. Added a new unit test that first
replicated the problem before the fix, and verified the fix corrects the
problem.
---
.../org/apache/kafka/connect/util/TopicAdmin.java | 4 +++
.../apache/kafka/connect/util/TopicAdminTest.java | 32 +++++++++++++++++++++-
2 files changed, 35 insertions(+), 1 deletion(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index 4e50eab..df39250 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -714,6 +714,7 @@ public class TopicAdmin implements AutoCloseable {
* must be 0 or more
* @return the map of offset for each topic partition, or
an empty map if the supplied partitions
* are null or empty
+ * @throws UnsupportedVersionException if the broker is too old to support
the admin client API to read end offsets
* @throws ConnectException if {@code timeoutDuration} is exhausted
* @see TopicAdmin#endOffsets(Set)
*/
@@ -725,6 +726,9 @@ public class TopicAdmin implements AutoCloseable {
() -> "list offsets for topic partitions",
timeoutDuration,
retryBackoffMs);
+ } catch (UnsupportedVersionException e) {
+ // Older brokers don't support this admin method, so rethrow it
without wrapping it
+ throw e;
} catch (Exception e) {
throw new ConnectException("Failed to list offsets for topic
partitions.", e);
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index 9f4f384..0ca8ba2 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -505,11 +505,33 @@ public class TopicAdminTest {
KafkaFuture<ListOffsetsResultInfo> future = mockFuture();
when(future.get()).thenReturn(resultsInfo);
when(results.partitionResult(eq(tp))).thenReturn(future);
+ }
+ /**
+ * TopicAdmin can be used to read the end offsets, but the admin client
API used to do this was
+ * added to the broker in 0.11.0.0. This means that if Connect talks to
older brokers,
+ * the admin client cannot be used to read end offsets, and will throw an
UnsupportedVersionException.
+ */
+ @Test
+ public void retryEndOffsetsShouldRethrowUnknownVersionException() {
+ String topicName = "myTopic";
+ TopicPartition tp1 = new TopicPartition(topicName, 0);
+ Set<TopicPartition> tps = Collections.singleton(tp1);
+ Long offset = 1000L;
+ Cluster cluster = createCluster(1, topicName, 1);
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new
MockTime(), cluster)) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster,
Errors.NONE));
+ // Expect the admin client list offsets will throw unsupported
version, simulating older brokers
+
env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1,
offset));
+ TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+ // The retryEndOffsets should catch and rethrow an unsupported
version exception
+ assertThrows(UnsupportedVersionException.class, () ->
admin.retryEndOffsets(tps, Duration.ofMillis(100), 1));
+ }
}
@Test
- public void retryEndOffsetsShouldThrowConnectException() {
+ public void
retryEndOffsetsShouldWrapNonRetriableExceptionsWithConnectException() {
String topicName = "myTopic";
TopicPartition tp1 = new TopicPartition(topicName, 0);
Set<TopicPartition> tps = Collections.singleton(tp1);
@@ -758,6 +780,14 @@ public class TopicAdminTest {
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
}
+ private ListOffsetResponse
listOffsetsResultWithUnsupportedVersion(TopicPartition tp1, Long offset1) {
+ return listOffsetResponse(
+ Collections.singletonMap(tp1, offset1),
+ Errors.UNSUPPORTED_VERSION,
+ 1L,
+ ListOffsetResponse.UNKNOWN_EPOCH
+ );
+ }
private ListOffsetResponse listOffsetsResult(TopicPartition tp1, Long
offset1) {
return listOffsetResponse(Collections.singletonMap(tp1, offset1),
Errors.NONE, 1L, ListOffsetResponse.UNKNOWN_EPOCH);
}