This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 346cc48de9a3152e3ffa64f8432fc878e9be0418 Author: Philip Nee <philip...@gmail.com> AuthorDate: Wed Mar 9 10:39:28 2022 -0800 KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797) Fixes the compatibility issue regarding KAFKA-12879 by reverting the changes to the admin client from KAFKA-12339 (#10152) that retry admin client operations, and instead perform the retries within Connect's `KafkaBasedLog` during startup via a new `TopicAdmin.retryEndOffsets(..)` method. This method delegates to the existing `TopicAdmin.endOffsets(...)` method, but will retry on `RetriableException` until the retry timeout elapses. This change should be backward compatible to the KAFKA-12339 so that when Connect's `KafkaBasedLog` starts up it will retry attempts to read the end offsets for the log's topic. The `KafkaBasedLog` existing thread already has its own retry logic, and this is not changed. Added more unit tests, and thoroughly tested the new `RetryUtil` used to encapsulate the parameterized retry logic around any supplied function. --- .../admin/internals/MetadataOperationContext.java | 1 - .../kafka/clients/admin/KafkaAdminClientTest.java | 42 +---- .../apache/kafka/connect/util/KafkaBasedLog.java | 27 ++- .../org/apache/kafka/connect/util/RetryUtil.java | 101 +++++++++++ .../org/apache/kafka/connect/util/TopicAdmin.java | 29 +++- .../kafka/connect/util/KafkaBasedLogTest.java | 14 +- .../apache/kafka/connect/util/RetryUtilTest.java | 184 +++++++++++++++++++++ .../apache/kafka/connect/util/TopicAdminTest.java | 59 ++++++- 8 files changed, 402 insertions(+), 55 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java index e7f2c07..c05e5cf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java @@ -82,7 +82,6 @@ public final class MetadataOperationContext<T, O extends AbstractOptions<O>> { public static void handleMetadataErrors(MetadataResponse response) { for (TopicMetadata tm : response.topicMetadata()) { - if (shouldRefreshMetadata(tm.error())) throw tm.error().exception(); for (PartitionMetadata pm : tm.partitionMetadata()) { if (shouldRefreshMetadata(pm.error)) { throw pm.error.exception(); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 6eb972b..b485432 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -445,16 +445,12 @@ public class KafkaAdminClientTest { } private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) { - return prepareMetadataResponse(cluster, error, error); - } - - private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) { List<MetadataResponseTopic> metadata = new ArrayList<>(); for (String topic : cluster.topics()) { List<MetadataResponsePartition> pms = new ArrayList<>(); for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) { MetadataResponsePartition pm = new MetadataResponsePartition() - .setErrorCode(partitionError.code()) + .setErrorCode(error.code()) .setPartitionIndex(pInfo.partition()) .setLeaderId(pInfo.leader().id()) .setLeaderEpoch(234) @@ -464,7 +460,7 @@ public class KafkaAdminClientTest { pms.add(pm); } MetadataResponseTopic tm = new MetadataResponseTopic() - .setErrorCode(topicError.code()) + .setErrorCode(error.code()) .setName(topic) .setIsInternal(false) .setPartitions(pms); @@ -3913,40 +3909,6 @@ public class KafkaAdminClientTest { } @Test - public void testListOffsetsRetriableErrorOnMetadata() throws Exception { - Node node = new Node(0, "localhost", 8120); - List<Node> nodes = Collections.singletonList(node); - final Cluster cluster = new Cluster( - "mockClusterId", - nodes, - Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})), - Collections.emptySet(), - Collections.emptySet(), - node); - final TopicPartition tp0 = new TopicPartition("foo", 0); - - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE)); - // metadata refresh because of UNKNOWN_TOPIC_OR_PARTITION - env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); - // listoffsets response from broker 0 - ListOffsetsResponseData responseData = new ListOffsetsResponseData() - .setThrottleTimeMs(0) - .setTopics(Collections.singletonList(ListOffsetsResponse.singletonListOffsetsTopicResponse(tp0, Errors.NONE, -1L, 123L, 321))); - env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node); - - ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.latest())); - - Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS); - assertEquals(1, offsets.size()); - assertEquals(123L, offsets.get(tp0).offset()); - assertEquals(321, offsets.get(tp0).leaderEpoch().get().intValue()); - assertEquals(-1L, offsets.get(tp0).timestamp()); - } - } - - @Test public void testListOffsetsRetriableErrors() throws Exception { Node node0 = new Node(0, "localhost", 8120); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 6e2350f..6649279 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -76,6 +76,10 @@ public class KafkaBasedLog<K, V> { private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class); private static final long CREATE_TOPIC_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30); private static final long MAX_SLEEP_MS = TimeUnit.SECONDS.toMillis(1); + // 15min of admin retry duration to ensure successful metadata propagation. 10 seconds of backoff + // in between retries + private static final Duration ADMIN_CLIENT_RETRY_DURATION = Duration.ofMinutes(15); + private static final long ADMIN_CLIENT_RETRY_BACKOFF_MS = TimeUnit.SECONDS.toMillis(10); private Time time; private final String topic; @@ -194,7 +198,7 @@ public class KafkaBasedLog<K, V> { // when a 'group.id' is specified (if offsets happen to have been committed unexpectedly). consumer.seekToBeginning(partitions); - readToLogEnd(); + readToLogEnd(true); thread = new WorkThread(); thread.start(); @@ -319,9 +323,16 @@ public class KafkaBasedLog<K, V> { } } - private void readToLogEnd() { + /** + * This method finds the end offsets of the Kafka log's topic partitions, optionally retrying + * if the {@code listOffsets()} method of the admin client throws a {@link RetriableException}. + * + * @param shouldRetry Boolean flag to enable retry for the admin client {@code listOffsets()} call. + * @see TopicAdmin#retryEndOffsets + */ + private void readToLogEnd(boolean shouldRetry) { Set<TopicPartition> assignment = consumer.assignment(); - Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment); + Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment, shouldRetry); log.trace("Reading to end of log offsets {}", endOffsets); while (!endOffsets.isEmpty()) { @@ -345,7 +356,7 @@ public class KafkaBasedLog<K, V> { } // Visible for testing - Map<TopicPartition, Long> readEndOffsets(Set<TopicPartition> assignment) { + Map<TopicPartition, Long> readEndOffsets(Set<TopicPartition> assignment, boolean shouldRetry) { log.trace("Reading to end of offset log"); // Note that we'd prefer to not use the consumer to find the end offsets for the assigned topic partitions. @@ -360,6 +371,12 @@ public class KafkaBasedLog<K, V> { // Use the admin client to immediately find the end offsets for the assigned topic partitions. // Unlike using the consumer try { + if (shouldRetry) { + return admin.retryEndOffsets(assignment, + ADMIN_CLIENT_RETRY_DURATION, + ADMIN_CLIENT_RETRY_BACKOFF_MS); + } + return admin.endOffsets(assignment); } catch (UnsupportedVersionException e) { // This may happen with really old brokers that don't support the auto topic creation @@ -395,7 +412,7 @@ public class KafkaBasedLog<K, V> { if (numCallbacks > 0) { try { - readToLogEnd(); + readToLogEnd(false); log.trace("Finished read to end log for topic {}", topic); } catch (TimeoutException e) { log.warn("Timeout while reading log to end for topic '{}'. Retrying automatically. " + diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java new file mode 100644 index 0000000..9463f6a --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.function.Supplier; + +public class RetryUtil { + private static final Logger log = LoggerFactory.getLogger(RetryUtil.class); + + /** + * The method executes the callable at least once, optionally retrying the callable if + * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown. If timeout is exhausted, + * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown. + * + * <p>{@code description} supplies the message that indicates the purpose of the callable since the message will + * be used for logging. For example, "list offsets". If the supplier is null or the supplied string is + * null, {@code callable} will be used as the default string. + * + * <p>The task will be executed at least once. No retries will be performed + * if {@code timeoutDuration} is 0 or negative, or if {@code timeoutDuration} is less than {@code retryBackoffMs}. + * + * <p>A {@code retryBackoffMs} that is negative or zero will result in no delays between retries. + * + * @param callable the function to execute + * @param description supplier that provides custom message for logging purpose + * @param timeoutDuration timeout duration; must not be null + * @param retryBackoffMs the number of milliseconds to delay upon receiving a + * {@link org.apache.kafka.connect.errors.RetriableException} before retrying again + * @throws ConnectException If the task exhausted all the retries + */ + public static <T> T retryUntilTimeout(Callable<T> callable, Supplier<String> description, Duration timeoutDuration, long retryBackoffMs) throws Exception { + // if null supplier or string is provided, the message will be default to "callabe" + final String descriptionStr = Optional.ofNullable(description) + .map(Supplier::get) + .orElse("callable"); + + // handling null duration + final long timeoutMs = Optional.ofNullable(timeoutDuration) + .map(Duration::toMillis) + .orElse(0L); + + if (retryBackoffMs < 0) { + log.debug("Assuming no retry backoff since retryBackoffMs={} is negative", retryBackoffMs); + retryBackoffMs = 0; + } + if (timeoutMs <= 0 || retryBackoffMs >= timeoutMs) { + log.debug("Executing {} only once, since timeoutMs={} is not larger than retryBackoffMs={}", + descriptionStr, timeoutMs, retryBackoffMs); + return callable.call(); + } + + final long end = System.currentTimeMillis() + timeoutMs; + int attempt = 0; + Throwable lastError = null; + do { + attempt++; + try { + return callable.call(); + } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) { + log.warn("Attempt {} to {} resulted in RetriableException; retrying automatically. " + + "Reason: {}", attempt, descriptionStr, e.getMessage(), e); + lastError = e; + } catch (WakeupException e) { + lastError = e; + } + + long millisRemaining = Math.max(0, end - System.currentTimeMillis()); + if (millisRemaining < retryBackoffMs) { + // exit when the time remaining is less than retryBackoffMs + break; + } + Utils.sleep(retryBackoffMs); + } while (System.currentTimeMillis() < end); + + throw new ConnectException("Fail to " + descriptionStr + " after " + attempt + " attempts. Reason: " + lastError.getMessage(), lastError); + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index 9661c69..86a3ea2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -655,7 +655,7 @@ public class TopicAdmin implements AutoCloseable { * @throws TimeoutException if the offset metadata could not be fetched before the amount of time allocated * by {@code request.timeout.ms} expires, and this call can be retried * @throws LeaderNotAvailableException if the leader was not available and this call can be retried - * @throws RetriableException if a retriable error occurs, or the thread is interrupted while attempting + * @throws RetriableException if a retriable error occurs, or the thread is interrupted while attempting * to perform this operation * @throws ConnectException if a non retriable error occurs */ @@ -703,6 +703,33 @@ public class TopicAdmin implements AutoCloseable { return result; } + /** + * Fetch the most recent offset for each of the supplied {@link TopicPartition} objects, and performs retry when + * {@link org.apache.kafka.connect.errors.RetriableException} is thrown. + * + * @param partitions the topic partitions + * @param timeoutDuration timeout duration; may not be null + * @param retryBackoffMs the number of milliseconds to delay upon receiving a + * {@link org.apache.kafka.connect.errors.RetriableException} before retrying again; + * must be 0 or more + * @return the map of offset for each topic partition, or an empty map if the supplied partitions + * are null or empty + * @throws ConnectException if {@code timeoutDuration} is exhausted + * @see TopicAdmin#endOffsets(Set) + */ + public Map<TopicPartition, Long> retryEndOffsets(Set<TopicPartition> partitions, Duration timeoutDuration, long retryBackoffMs) { + + try { + return RetryUtil.retryUntilTimeout( + () -> endOffsets(partitions), + () -> "list offsets for topic partitions", + timeoutDuration, + retryBackoffMs); + } catch (Exception e) { + throw new ConnectException("Failed to list offsets for topic partitions.", e); + } + } + @Override public void close() { admin.close(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java index e36f2a9..9d38e44 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java @@ -478,13 +478,15 @@ public class KafkaBasedLogTest { Map<TopicPartition, Long> endOffsets = new HashMap<>(); endOffsets.put(TP0, 0L); endOffsets.put(TP1, 0L); + admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), EasyMock.anyLong()); + PowerMock.expectLastCall().andReturn(endOffsets).times(1); admin.endOffsets(EasyMock.eq(tps)); - PowerMock.expectLastCall().andReturn(endOffsets).times(2); + PowerMock.expectLastCall().andReturn(endOffsets).times(1); PowerMock.replayAll(); store.start(); - assertEquals(endOffsets, store.readEndOffsets(tps)); + assertEquals(endOffsets, store.readEndOffsets(tps, false)); } @Test @@ -495,7 +497,7 @@ public class KafkaBasedLogTest { Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1)); // Getting end offsets using the admin client should fail with unsupported version - admin.endOffsets(EasyMock.eq(tps)); + admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), EasyMock.anyLong()); PowerMock.expectLastCall().andThrow(new UnsupportedVersionException("too old")); // Falls back to the consumer @@ -507,7 +509,7 @@ public class KafkaBasedLogTest { PowerMock.replayAll(); store.start(); - assertEquals(endOffsets, store.readEndOffsets(tps)); + assertEquals(endOffsets, store.readEndOffsets(tps, false)); } @Test @@ -521,7 +523,7 @@ public class KafkaBasedLogTest { endOffsets.put(TP0, 0L); endOffsets.put(TP1, 0L); // Getting end offsets upon startup should work fine - admin.endOffsets(EasyMock.eq(tps)); + admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), EasyMock.anyLong()); PowerMock.expectLastCall().andReturn(endOffsets).times(1); // Getting end offsets using the admin client should fail with leader not available admin.endOffsets(EasyMock.eq(tps)); @@ -530,7 +532,7 @@ public class KafkaBasedLogTest { PowerMock.replayAll(); store.start(); - assertThrows(LeaderNotAvailableException.class, () -> store.readEndOffsets(tps)); + assertThrows(LeaderNotAvailableException.class, () -> store.readEndOffsets(tps, false)); } @SuppressWarnings("unchecked") diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java new file mode 100644 index 0000000..05f0212 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.connect.errors.ConnectException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.time.Duration; +import java.util.concurrent.Callable; +import java.util.function.Supplier; + +@RunWith(PowerMockRunner.class) +public class RetryUtilTest { + + private Callable<String> mockCallable; + private final Supplier<String> testMsg = () -> "Test"; + + @SuppressWarnings("unchecked") + @Before + public void setUp() throws Exception { + mockCallable = Mockito.mock(Callable.class); + } + + @Test + public void testSuccess() throws Exception { + Mockito.when(mockCallable.call()).thenReturn("success"); + assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 1)); + Mockito.verify(mockCallable, Mockito.times(1)).call(); + } + + // timeout the test after 1000ms if unable to complete within a reasonable time frame + @Test(timeout = 1000) + public void testExhaustingRetries() throws Exception { + Mockito.when(mockCallable.call()).thenThrow(new TimeoutException()); + ConnectException e = assertThrows(ConnectException.class, + () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(50), 10)); + Mockito.verify(mockCallable, Mockito.atLeastOnce()).call(); + } + + @Test + public void retriesEventuallySucceed() throws Exception { + Mockito.when(mockCallable.call()) + .thenThrow(new TimeoutException()) + .thenThrow(new TimeoutException()) + .thenThrow(new TimeoutException()) + .thenReturn("success"); + + assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 1)); + Mockito.verify(mockCallable, Mockito.times(4)).call(); + } + + @Test + public void failWithNonRetriableException() throws Exception { + Mockito.when(mockCallable.call()) + .thenThrow(new TimeoutException("timeout")) + .thenThrow(new TimeoutException("timeout")) + .thenThrow(new TimeoutException("timeout")) + .thenThrow(new TimeoutException("timeout")) + .thenThrow(new TimeoutException("timeout")) + .thenThrow(new NullPointerException("Non retriable")); + NullPointerException e = assertThrows(NullPointerException.class, + () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 0)); + assertEquals("Non retriable", e.getMessage()); + Mockito.verify(mockCallable, Mockito.times(6)).call(); + } + + @Test + public void noRetryAndSucceed() throws Exception { + Mockito.when(mockCallable.call()).thenReturn("success"); + + assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(0), 100)); + Mockito.verify(mockCallable, Mockito.times(1)).call(); + } + + @Test + public void noRetryAndFailed() throws Exception { + Mockito.when(mockCallable.call()).thenThrow(new TimeoutException("timeout exception")); + + TimeoutException e = assertThrows(TimeoutException.class, + () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(0), 100)); + Mockito.verify(mockCallable, Mockito.times(1)).call(); + assertEquals("timeout exception", e.getMessage()); + } + + @Test + public void testNoBackoffTimeAndSucceed() throws Exception { + Mockito.when(mockCallable.call()) + .thenThrow(new TimeoutException()) + .thenThrow(new TimeoutException()) + .thenThrow(new TimeoutException()) + .thenReturn("success"); + + assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(50), 0)); + Mockito.verify(mockCallable, Mockito.times(4)).call(); + } + + @Test + public void testNoBackoffTimeAndFail() throws Exception { + Mockito.when(mockCallable.call()).thenThrow(new TimeoutException("timeout exception")); + + ConnectException e = assertThrows(ConnectException.class, + () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(80), 0)); + Mockito.verify(mockCallable, Mockito.atLeastOnce()).call(); + assertTrue(e.getMessage().contains("Reason: timeout exception")); + } + + @Test + public void testBackoffMoreThanTimeoutWillOnlyExecuteOnce() throws Exception { + Mockito.when(mockCallable.call()).thenThrow(new TimeoutException("timeout exception")); + + TimeoutException e = assertThrows(TimeoutException.class, + () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(50), 100)); + Mockito.verify(mockCallable, Mockito.times(1)).call(); + } + + @Test + public void testInvalidTimeDuration() throws Exception { + Mockito.when(mockCallable.call()).thenReturn("success"); + + assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, null, 10)); + assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(-1), 10)); + Mockito.verify(mockCallable, Mockito.times(2)).call(); + } + + @Test + public void testInvalidRetryTimeout() throws Exception { + Mockito.when(mockCallable.call()) + .thenThrow(new TimeoutException("timeout")) + .thenReturn("success"); + assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), -1)); + Mockito.verify(mockCallable, Mockito.times(2)).call(); + } + + @Test + public void testSupplier() throws Exception { + Mockito.when(mockCallable.call()).thenThrow(new TimeoutException("timeout exception")); + + ConnectException e = assertThrows(ConnectException.class, + () -> RetryUtil.retryUntilTimeout(mockCallable, null, Duration.ofMillis(100), 10)); + assertTrue(e.getMessage().startsWith("Fail to callable")); + + e = assertThrows(ConnectException.class, + () -> RetryUtil.retryUntilTimeout(mockCallable, () -> null, Duration.ofMillis(100), 10)); + assertTrue(e.getMessage().startsWith("Fail to callable")); + + e = assertThrows(ConnectException.class, + () -> RetryUtil.retryUntilTimeout(mockCallable, () -> "execute lambda", Duration.ofMillis(500), 10)); + assertTrue(e.getMessage().startsWith("Fail to execute lambda")); + Mockito.verify(mockCallable, Mockito.atLeast(3)).call(); + } + + @Test + public void testWakeupException() throws Exception { + Mockito.when(mockCallable.call()).thenThrow(new WakeupException()); + + ConnectException e = assertThrows(ConnectException.class, + () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(50), 10)); + Mockito.verify(mockCallable, Mockito.atLeastOnce()).call(); + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index edd9891..72a808e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.util; import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AdminClientUnitTestEnv; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.DescribeTopicsResult; @@ -53,6 +54,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.connect.errors.ConnectException; import org.junit.Test; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -466,6 +468,55 @@ public class TopicAdminTest { } @Test + public void retryEndOffsetsShouldThrowConnectException() { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set<TopicPartition> tps = Collections.singleton(tp1); + Long offset = 1000L; + Cluster cluster = createCluster(1, "myTopic", 1); + + try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(10), cluster)) { + Map<TopicPartition, Long> offsetMap = new HashMap<>(); + offsetMap.put(tp1, offset); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE)); + Map<String, Object> adminConfig = new HashMap<>(); + adminConfig.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0"); + TopicAdmin admin = new TopicAdmin(adminConfig, env.adminClient()); + + assertThrows(ConnectException.class, () -> { + admin.retryEndOffsets(tps, Duration.ofMillis(100), 1); + }); + } + } + + @Test + public void retryEndOffsetsShouldRetryWhenTopicNotFound() { + String topicName = "myTopic"; + TopicPartition tp1 = new TopicPartition(topicName, 0); + Set<TopicPartition> tps = Collections.singleton(tp1); + Long offset = 1000L; + Cluster cluster = createCluster(1, "myTopic", 1); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(10), cluster)) { + Map<TopicPartition, Long> offsetMap = new HashMap<>(); + offsetMap.put(tp1, offset); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION)); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset)); + + Map<String, Object> adminConfig = new HashMap<>(); + adminConfig.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0"); + TopicAdmin admin = new TopicAdmin(adminConfig, env.adminClient()); + Map<TopicPartition, Long> endoffsets = admin.retryEndOffsets(tps, Duration.ofMillis(100), 1); + assertNotNull(endoffsets); + assertTrue(endoffsets.containsKey(tp1)); + assertEquals(1000L, endoffsets.get(tp1).longValue()); + } + } + + @Test public void endOffsetsShouldFailWithNonRetriableWhenAuthorizationFailureOccurs() { String topicName = "myTopic"; TopicPartition tp1 = new TopicPartition(topicName, 0); @@ -635,12 +686,16 @@ public class TopicAdminTest { } private MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) { + return prepareMetadataResponse(cluster, error, error); + } + + private MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) { List<MetadataResponseTopic> metadata = new ArrayList<>(); for (String topic : cluster.topics()) { List<MetadataResponseData.MetadataResponsePartition> pms = new ArrayList<>(); for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) { MetadataResponseData.MetadataResponsePartition pm = new MetadataResponseData.MetadataResponsePartition() - .setErrorCode(error.code()) + .setErrorCode(partitionError.code()) .setPartitionIndex(pInfo.partition()) .setLeaderId(pInfo.leader().id()) .setLeaderEpoch(234) @@ -650,7 +705,7 @@ public class TopicAdminTest { pms.add(pm); } MetadataResponseTopic tm = new MetadataResponseTopic() - .setErrorCode(error.code()) + .setErrorCode(topicError.code()) .setName(topic) .setIsInternal(false) .setPartitions(pms);