This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 346cc48de9a3152e3ffa64f8432fc878e9be0418
Author: Philip Nee <philip...@gmail.com>
AuthorDate: Wed Mar 9 10:39:28 2022 -0800

    KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry 
capability to KafkaBasedLog (#11797)
    
    Fixes the compatibility issue regarding KAFKA-12879 by reverting the 
changes to the admin client from KAFKA-12339 (#10152) that retry admin client 
operations, and instead perform the retries within Connect's `KafkaBasedLog` 
during startup via a new `TopicAdmin.retryEndOffsets(..)` method. This method 
delegates to the existing `TopicAdmin.endOffsets(...)` method, but will retry 
on `RetriableException` until the retry timeout elapses.
    
    This change should be backward compatible to the KAFKA-12339 so that when 
Connect's `KafkaBasedLog` starts up it will retry attempts to read the end 
offsets for the log's topic. The `KafkaBasedLog` existing thread already has 
its own retry logic, and this is not changed.
    
    Added more unit tests, and thoroughly tested the new `RetryUtil` used to 
encapsulate the parameterized retry logic around any supplied function.
---
 .../admin/internals/MetadataOperationContext.java  |   1 -
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  42 +----
 .../apache/kafka/connect/util/KafkaBasedLog.java   |  27 ++-
 .../org/apache/kafka/connect/util/RetryUtil.java   | 101 +++++++++++
 .../org/apache/kafka/connect/util/TopicAdmin.java  |  29 +++-
 .../kafka/connect/util/KafkaBasedLogTest.java      |  14 +-
 .../apache/kafka/connect/util/RetryUtilTest.java   | 184 +++++++++++++++++++++
 .../apache/kafka/connect/util/TopicAdminTest.java  |  59 ++++++-
 8 files changed, 402 insertions(+), 55 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
index e7f2c07..c05e5cf 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
@@ -82,7 +82,6 @@ public final class MetadataOperationContext<T, O extends 
AbstractOptions<O>> {
 
     public static void handleMetadataErrors(MetadataResponse response) {
         for (TopicMetadata tm : response.topicMetadata()) {
-            if (shouldRefreshMetadata(tm.error())) throw 
tm.error().exception();
             for (PartitionMetadata pm : tm.partitionMetadata()) {
                 if (shouldRefreshMetadata(pm.error)) {
                     throw pm.error.exception();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 6eb972b..b485432 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -445,16 +445,12 @@ public class KafkaAdminClientTest {
     }
 
     private static MetadataResponse prepareMetadataResponse(Cluster cluster, 
Errors error) {
-        return prepareMetadataResponse(cluster, error, error);
-    }
-
-    private static MetadataResponse prepareMetadataResponse(Cluster cluster, 
Errors topicError, Errors partitionError) {
         List<MetadataResponseTopic> metadata = new ArrayList<>();
         for (String topic : cluster.topics()) {
             List<MetadataResponsePartition> pms = new ArrayList<>();
             for (PartitionInfo pInfo : 
cluster.availablePartitionsForTopic(topic)) {
                 MetadataResponsePartition pm  = new MetadataResponsePartition()
-                    .setErrorCode(partitionError.code())
+                    .setErrorCode(error.code())
                     .setPartitionIndex(pInfo.partition())
                     .setLeaderId(pInfo.leader().id())
                     .setLeaderEpoch(234)
@@ -464,7 +460,7 @@ public class KafkaAdminClientTest {
                 pms.add(pm);
             }
             MetadataResponseTopic tm = new MetadataResponseTopic()
-                .setErrorCode(topicError.code())
+                .setErrorCode(error.code())
                 .setName(topic)
                 .setIsInternal(false)
                 .setPartitions(pms);
@@ -3913,40 +3909,6 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testListOffsetsRetriableErrorOnMetadata() throws Exception {
-        Node node = new Node(0, "localhost", 8120);
-        List<Node> nodes = Collections.singletonList(node);
-        final Cluster cluster = new Cluster(
-            "mockClusterId",
-            nodes,
-            Collections.singleton(new PartitionInfo("foo", 0, node, new 
Node[]{node}, new Node[]{node})),
-            Collections.emptySet(),
-            Collections.emptySet(),
-            node);
-        final TopicPartition tp0 = new TopicPartition("foo", 0);
-
-        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
-            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE));
-            // metadata refresh because of UNKNOWN_TOPIC_OR_PARTITION
-            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
-            // listoffsets response from broker 0
-            ListOffsetsResponseData responseData = new 
ListOffsetsResponseData()
-                .setThrottleTimeMs(0)
-                
.setTopics(Collections.singletonList(ListOffsetsResponse.singletonListOffsetsTopicResponse(tp0,
 Errors.NONE, -1L, 123L, 321)));
-            env.kafkaClient().prepareResponseFrom(new 
ListOffsetsResponse(responseData), node);
-
-            ListOffsetsResult result = 
env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.latest()));
-
-            Map<TopicPartition, ListOffsetsResultInfo> offsets = 
result.all().get(3, TimeUnit.SECONDS);
-            assertEquals(1, offsets.size());
-            assertEquals(123L, offsets.get(tp0).offset());
-            assertEquals(321, offsets.get(tp0).leaderEpoch().get().intValue());
-            assertEquals(-1L, offsets.get(tp0).timestamp());
-        }
-    }
-
-    @Test
     public void testListOffsetsRetriableErrors() throws Exception {
 
         Node node0 = new Node(0, "localhost", 8120);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 6e2350f..6649279 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -76,6 +76,10 @@ public class KafkaBasedLog<K, V> {
     private static final Logger log = 
LoggerFactory.getLogger(KafkaBasedLog.class);
     private static final long CREATE_TOPIC_TIMEOUT_NS = 
TimeUnit.SECONDS.toNanos(30);
     private static final long MAX_SLEEP_MS = TimeUnit.SECONDS.toMillis(1);
+    // 15min of admin retry duration to ensure successful metadata 
propagation.  10 seconds of backoff
+    // in between retries
+    private static final Duration ADMIN_CLIENT_RETRY_DURATION = 
Duration.ofMinutes(15);
+    private static final long ADMIN_CLIENT_RETRY_BACKOFF_MS = 
TimeUnit.SECONDS.toMillis(10);
 
     private Time time;
     private final String topic;
@@ -194,7 +198,7 @@ public class KafkaBasedLog<K, V> {
         // when a 'group.id' is specified (if offsets happen to have been 
committed unexpectedly).
         consumer.seekToBeginning(partitions);
 
-        readToLogEnd();
+        readToLogEnd(true);
 
         thread = new WorkThread();
         thread.start();
@@ -319,9 +323,16 @@ public class KafkaBasedLog<K, V> {
         }
     }
 
-    private void readToLogEnd() {
+    /**
+     * This method finds the end offsets of the Kafka log's topic partitions, 
optionally retrying
+     * if the {@code listOffsets()} method of the admin client throws a {@link 
RetriableException}.
+     *
+     * @param shouldRetry Boolean flag to enable retry for the admin client 
{@code listOffsets()} call.
+     * @see TopicAdmin#retryEndOffsets
+     */
+    private void readToLogEnd(boolean shouldRetry) {
         Set<TopicPartition> assignment = consumer.assignment();
-        Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment);
+        Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment, 
shouldRetry);
         log.trace("Reading to end of log offsets {}", endOffsets);
 
         while (!endOffsets.isEmpty()) {
@@ -345,7 +356,7 @@ public class KafkaBasedLog<K, V> {
     }
 
     // Visible for testing
-    Map<TopicPartition, Long> readEndOffsets(Set<TopicPartition> assignment) {
+    Map<TopicPartition, Long> readEndOffsets(Set<TopicPartition> assignment, 
boolean shouldRetry) {
         log.trace("Reading to end of offset log");
 
         // Note that we'd prefer to not use the consumer to find the end 
offsets for the assigned topic partitions.
@@ -360,6 +371,12 @@ public class KafkaBasedLog<K, V> {
             // Use the admin client to immediately find the end offsets for 
the assigned topic partitions.
             // Unlike using the consumer
             try {
+                if (shouldRetry) {
+                    return admin.retryEndOffsets(assignment,
+                            ADMIN_CLIENT_RETRY_DURATION,
+                            ADMIN_CLIENT_RETRY_BACKOFF_MS);
+                }
+
                 return admin.endOffsets(assignment);
             } catch (UnsupportedVersionException e) {
                 // This may happen with really old brokers that don't support 
the auto topic creation
@@ -395,7 +412,7 @@ public class KafkaBasedLog<K, V> {
 
                     if (numCallbacks > 0) {
                         try {
-                            readToLogEnd();
+                            readToLogEnd(false);
                             log.trace("Finished read to end log for topic {}", 
topic);
                         } catch (TimeoutException e) {
                             log.warn("Timeout while reading log to end for 
topic '{}'. Retrying automatically. " +
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
new file mode 100644
index 0000000..9463f6a
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the 
callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being 
thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link 
org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>{@code description} supplies the message that indicates the purpose 
of the callable since the message will
+     * be used for logging.  For example, "list offsets". If the supplier is 
null or the supplied string is
+     * null, {@code callable} will be used as the default string.
+     *
+     * <p>The task will be executed at least once. No retries will be 
performed 
+     * if {@code timeoutDuration} is 0 or negative, or if {@code 
timeoutDuration} is less than {@code retryBackoffMs}.
+     *
+     * <p>A {@code retryBackoffMs} that is negative or zero will result in no 
delays between retries.
+     *
+     * @param callable          the function to execute
+     * @param description       supplier that provides custom message for 
logging purpose
+     * @param timeoutDuration   timeout duration; must not be null
+     * @param retryBackoffMs    the number of milliseconds to delay upon 
receiving a
+     *                          {@link 
org.apache.kafka.connect.errors.RetriableException} before retrying again
+     * @throws ConnectException If the task exhausted all the retries
+     */
+    public static <T> T retryUntilTimeout(Callable<T> callable, 
Supplier<String> description, Duration timeoutDuration, long retryBackoffMs) 
throws Exception {
+        // if null supplier or string is provided, the message will be default 
to "callabe"
+        final String descriptionStr = Optional.ofNullable(description)
+                .map(Supplier::get)
+                .orElse("callable");
+
+        // handling null duration
+        final long timeoutMs = Optional.ofNullable(timeoutDuration)
+                .map(Duration::toMillis)
+                .orElse(0L);
+
+        if (retryBackoffMs < 0) {
+            log.debug("Assuming no retry backoff since retryBackoffMs={} is 
negative", retryBackoffMs);
+            retryBackoffMs = 0;
+        }
+        if (timeoutMs <= 0 || retryBackoffMs >= timeoutMs) {
+            log.debug("Executing {} only once, since timeoutMs={} is not 
larger than retryBackoffMs={}",
+                    descriptionStr, timeoutMs, retryBackoffMs);
+            return callable.call();
+        }
+
+        final long end = System.currentTimeMillis() + timeoutMs;
+        int attempt = 0;
+        Throwable lastError = null;
+        do {
+            attempt++;
+            try {
+                return callable.call();
+            } catch (RetriableException | 
org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("Attempt {} to {} resulted in RetriableException; 
retrying automatically. " +
+                        "Reason: {}", attempt, descriptionStr, e.getMessage(), 
e);
+                lastError = e;
+            } catch (WakeupException e) {
+                lastError = e;
+            }
+
+            long millisRemaining = Math.max(0, end - 
System.currentTimeMillis());
+            if (millisRemaining < retryBackoffMs) {
+                // exit when the time remaining is less than retryBackoffMs
+                break;
+            }
+            Utils.sleep(retryBackoffMs);
+        } while (System.currentTimeMillis() < end);
+
+        throw new ConnectException("Fail to " + descriptionStr + " after " + 
attempt + " attempts.  Reason: " + lastError.getMessage(), lastError);
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index 9661c69..86a3ea2 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -655,7 +655,7 @@ public class TopicAdmin implements AutoCloseable {
      * @throws TimeoutException if the offset metadata could not be fetched 
before the amount of time allocated
      *         by {@code request.timeout.ms} expires, and this call can be 
retried
      * @throws LeaderNotAvailableException if the leader was not available and 
this call can be retried
-     * @throws RetriableException if a retriable error occurs, or the thread 
is interrupted while attempting 
+     * @throws RetriableException if a retriable error occurs, or the thread 
is interrupted while attempting
      *         to perform this operation
      * @throws ConnectException if a non retriable error occurs
      */
@@ -703,6 +703,33 @@ public class TopicAdmin implements AutoCloseable {
         return result;
     }
 
+    /**
+     * Fetch the most recent offset for each of the supplied {@link 
TopicPartition} objects, and performs retry when
+     * {@link org.apache.kafka.connect.errors.RetriableException} is thrown.
+     *
+     * @param partitions        the topic partitions
+     * @param timeoutDuration   timeout duration; may not be null
+     * @param retryBackoffMs    the number of milliseconds to delay upon 
receiving a
+     *                          {@link 
org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @return                  the map of offset for each topic partition, or 
an empty map if the supplied partitions
+     *                          are null or empty
+     * @throws ConnectException if {@code timeoutDuration} is exhausted
+     * @see TopicAdmin#endOffsets(Set)
+     */
+    public Map<TopicPartition, Long> retryEndOffsets(Set<TopicPartition> 
partitions, Duration timeoutDuration, long retryBackoffMs) {
+
+        try {
+            return RetryUtil.retryUntilTimeout(
+                    () -> endOffsets(partitions),
+                    () -> "list offsets for topic partitions",
+                    timeoutDuration,
+                    retryBackoffMs);
+        } catch (Exception e) {
+            throw new ConnectException("Failed to list offsets for topic 
partitions.", e);
+        }
+    }
+
     @Override
     public void close() {
         admin.close();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index e36f2a9..9d38e44 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -478,13 +478,15 @@ public class KafkaBasedLogTest {
         Map<TopicPartition, Long> endOffsets = new HashMap<>();
         endOffsets.put(TP0, 0L);
         endOffsets.put(TP1, 0L);
+        admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), 
EasyMock.anyLong());
+        PowerMock.expectLastCall().andReturn(endOffsets).times(1);
         admin.endOffsets(EasyMock.eq(tps));
-        PowerMock.expectLastCall().andReturn(endOffsets).times(2);
+        PowerMock.expectLastCall().andReturn(endOffsets).times(1);
 
         PowerMock.replayAll();
 
         store.start();
-        assertEquals(endOffsets, store.readEndOffsets(tps));
+        assertEquals(endOffsets, store.readEndOffsets(tps, false));
     }
 
     @Test
@@ -495,7 +497,7 @@ public class KafkaBasedLogTest {
 
         Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
         // Getting end offsets using the admin client should fail with 
unsupported version
-        admin.endOffsets(EasyMock.eq(tps));
+        admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), 
EasyMock.anyLong());
         PowerMock.expectLastCall().andThrow(new 
UnsupportedVersionException("too old"));
 
         // Falls back to the consumer
@@ -507,7 +509,7 @@ public class KafkaBasedLogTest {
         PowerMock.replayAll();
 
         store.start();
-        assertEquals(endOffsets, store.readEndOffsets(tps));
+        assertEquals(endOffsets, store.readEndOffsets(tps, false));
     }
 
     @Test
@@ -521,7 +523,7 @@ public class KafkaBasedLogTest {
         endOffsets.put(TP0, 0L);
         endOffsets.put(TP1, 0L);
         // Getting end offsets upon startup should work fine
-        admin.endOffsets(EasyMock.eq(tps));
+        admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), 
EasyMock.anyLong());
         PowerMock.expectLastCall().andReturn(endOffsets).times(1);
         // Getting end offsets using the admin client should fail with leader 
not available
         admin.endOffsets(EasyMock.eq(tps));
@@ -530,7 +532,7 @@ public class KafkaBasedLogTest {
         PowerMock.replayAll();
 
         store.start();
-        assertThrows(LeaderNotAvailableException.class, () -> 
store.readEndOffsets(tps));
+        assertThrows(LeaderNotAvailableException.class, () -> 
store.readEndOffsets(tps, false));
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
new file mode 100644
index 0000000..05f0212
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.time.Duration;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+@RunWith(PowerMockRunner.class)
+public class RetryUtilTest {
+
+    private Callable<String> mockCallable;
+    private final Supplier<String> testMsg = () -> "Test";
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setUp() throws Exception {
+        mockCallable = Mockito.mock(Callable.class);
+    }
+
+    @Test
+    public void testSuccess() throws Exception {
+        Mockito.when(mockCallable.call()).thenReturn("success");
+        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, 
testMsg, Duration.ofMillis(100), 1));
+        Mockito.verify(mockCallable, Mockito.times(1)).call();
+    }
+
+    // timeout the test after 1000ms if unable to complete within a reasonable 
time frame
+    @Test(timeout = 1000)
+    public void testExhaustingRetries() throws Exception {
+        Mockito.when(mockCallable.call()).thenThrow(new TimeoutException());
+        ConnectException e = assertThrows(ConnectException.class,
+                () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, 
Duration.ofMillis(50), 10));
+        Mockito.verify(mockCallable, Mockito.atLeastOnce()).call();
+    }
+
+    @Test
+    public void retriesEventuallySucceed() throws Exception {
+        Mockito.when(mockCallable.call())
+                .thenThrow(new TimeoutException())
+                .thenThrow(new TimeoutException())
+                .thenThrow(new TimeoutException())
+                .thenReturn("success");
+
+        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, 
testMsg, Duration.ofMillis(100), 1));
+        Mockito.verify(mockCallable, Mockito.times(4)).call();
+    }
+
+    @Test
+    public void failWithNonRetriableException() throws Exception {
+        Mockito.when(mockCallable.call())
+                .thenThrow(new TimeoutException("timeout"))
+                .thenThrow(new TimeoutException("timeout"))
+                .thenThrow(new TimeoutException("timeout"))
+                .thenThrow(new TimeoutException("timeout"))
+                .thenThrow(new TimeoutException("timeout"))
+                .thenThrow(new NullPointerException("Non retriable"));
+        NullPointerException e = assertThrows(NullPointerException.class,
+                () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, 
Duration.ofMillis(100), 0));
+        assertEquals("Non retriable", e.getMessage());
+        Mockito.verify(mockCallable, Mockito.times(6)).call();
+    }
+
+    @Test
+    public void noRetryAndSucceed() throws Exception {
+        Mockito.when(mockCallable.call()).thenReturn("success");
+
+        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, 
testMsg, Duration.ofMillis(0), 100));
+        Mockito.verify(mockCallable, Mockito.times(1)).call();
+    }
+
+    @Test
+    public void noRetryAndFailed() throws Exception {
+        Mockito.when(mockCallable.call()).thenThrow(new 
TimeoutException("timeout exception"));
+
+        TimeoutException e = assertThrows(TimeoutException.class,
+                () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, 
Duration.ofMillis(0), 100));
+        Mockito.verify(mockCallable, Mockito.times(1)).call();
+        assertEquals("timeout exception", e.getMessage());
+    }
+
+    @Test
+    public void testNoBackoffTimeAndSucceed() throws Exception {
+        Mockito.when(mockCallable.call())
+                .thenThrow(new TimeoutException())
+                .thenThrow(new TimeoutException())
+                .thenThrow(new TimeoutException())
+                .thenReturn("success");
+
+        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, 
testMsg, Duration.ofMillis(50), 0));
+        Mockito.verify(mockCallable, Mockito.times(4)).call();
+    }
+
+    @Test
+    public void testNoBackoffTimeAndFail() throws Exception {
+        Mockito.when(mockCallable.call()).thenThrow(new 
TimeoutException("timeout exception"));
+
+        ConnectException e = assertThrows(ConnectException.class,
+                () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, 
Duration.ofMillis(80), 0));
+        Mockito.verify(mockCallable, Mockito.atLeastOnce()).call();
+        assertTrue(e.getMessage().contains("Reason: timeout exception"));
+    }
+
+    @Test
+    public void testBackoffMoreThanTimeoutWillOnlyExecuteOnce() throws 
Exception {
+        Mockito.when(mockCallable.call()).thenThrow(new 
TimeoutException("timeout exception"));
+
+        TimeoutException e = assertThrows(TimeoutException.class,
+                () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, 
Duration.ofMillis(50), 100));
+        Mockito.verify(mockCallable, Mockito.times(1)).call();
+    }
+
+    @Test
+    public void testInvalidTimeDuration() throws Exception {
+        Mockito.when(mockCallable.call()).thenReturn("success");
+
+        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, 
testMsg, null, 10));
+        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, 
testMsg, Duration.ofMillis(-1), 10));
+        Mockito.verify(mockCallable, Mockito.times(2)).call();
+    }
+
+    @Test
+    public void testInvalidRetryTimeout() throws Exception {
+        Mockito.when(mockCallable.call())
+                .thenThrow(new TimeoutException("timeout"))
+                .thenReturn("success");
+        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, 
testMsg, Duration.ofMillis(100), -1));
+        Mockito.verify(mockCallable, Mockito.times(2)).call();
+    }
+
+    @Test
+    public void testSupplier() throws Exception {
+        Mockito.when(mockCallable.call()).thenThrow(new 
TimeoutException("timeout exception"));
+
+        ConnectException e = assertThrows(ConnectException.class,
+                () -> RetryUtil.retryUntilTimeout(mockCallable, null, 
Duration.ofMillis(100), 10));
+        assertTrue(e.getMessage().startsWith("Fail to callable"));
+
+        e = assertThrows(ConnectException.class,
+                () -> RetryUtil.retryUntilTimeout(mockCallable, () -> null, 
Duration.ofMillis(100), 10));
+        assertTrue(e.getMessage().startsWith("Fail to callable"));
+
+        e = assertThrows(ConnectException.class,
+                () -> RetryUtil.retryUntilTimeout(mockCallable, () -> "execute 
lambda", Duration.ofMillis(500), 10));
+        assertTrue(e.getMessage().startsWith("Fail to execute lambda"));
+        Mockito.verify(mockCallable, Mockito.atLeast(3)).call();
+    }
+
+    @Test
+    public void testWakeupException() throws Exception {
+        Mockito.when(mockCallable.call()).thenThrow(new WakeupException());
+
+        ConnectException e = assertThrows(ConnectException.class,
+                () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, 
Duration.ofMillis(50), 10));
+        Mockito.verify(mockCallable, Mockito.atLeastOnce()).call();
+    }
+}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index edd9891..72a808e 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.util;
 
 import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.AdminClientUnitTestEnv;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
@@ -53,6 +54,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -466,6 +468,55 @@ public class TopicAdminTest {
     }
 
     @Test
+    public void retryEndOffsetsShouldThrowConnectException() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = 1000L;
+        Cluster cluster = createCluster(1, "myTopic", 1);
+
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(10), cluster)) {
+            Map<TopicPartition, Long> offsetMap = new HashMap<>();
+            offsetMap.put(tp1, offset);
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE));
+            Map<String, Object> adminConfig = new HashMap<>();
+            adminConfig.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0");
+            TopicAdmin admin = new TopicAdmin(adminConfig, env.adminClient());
+
+            assertThrows(ConnectException.class, () -> {
+                admin.retryEndOffsets(tps, Duration.ofMillis(100), 1);
+            });
+        }
+    }
+
+    @Test
+    public void retryEndOffsetsShouldRetryWhenTopicNotFound() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = 1000L;
+        Cluster cluster = createCluster(1, "myTopic", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new 
MockTime(10), cluster)) {
+            Map<TopicPartition, Long> offsetMap = new HashMap<>();
+            offsetMap.put(tp1, offset);
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.UNKNOWN_TOPIC_OR_PARTITION));
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset));
+
+            Map<String, Object> adminConfig = new HashMap<>();
+            adminConfig.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0");
+            TopicAdmin admin = new TopicAdmin(adminConfig, env.adminClient());
+            Map<TopicPartition, Long> endoffsets = admin.retryEndOffsets(tps, 
Duration.ofMillis(100), 1);
+            assertNotNull(endoffsets);
+            assertTrue(endoffsets.containsKey(tp1));
+            assertEquals(1000L, endoffsets.get(tp1).longValue());
+        }
+    }
+
+    @Test
     public void 
endOffsetsShouldFailWithNonRetriableWhenAuthorizationFailureOccurs() {
         String topicName = "myTopic";
         TopicPartition tp1 = new TopicPartition(topicName, 0);
@@ -635,12 +686,16 @@ public class TopicAdminTest {
     }
 
     private MetadataResponse prepareMetadataResponse(Cluster cluster, Errors 
error) {
+        return prepareMetadataResponse(cluster, error, error);
+    }
+
+    private MetadataResponse prepareMetadataResponse(Cluster cluster, Errors 
topicError, Errors partitionError) {
         List<MetadataResponseTopic> metadata = new ArrayList<>();
         for (String topic : cluster.topics()) {
             List<MetadataResponseData.MetadataResponsePartition> pms = new 
ArrayList<>();
             for (PartitionInfo pInfo : 
cluster.availablePartitionsForTopic(topic)) {
                 MetadataResponseData.MetadataResponsePartition pm  = new 
MetadataResponseData.MetadataResponsePartition()
-                        .setErrorCode(error.code())
+                        .setErrorCode(partitionError.code())
                         .setPartitionIndex(pInfo.partition())
                         .setLeaderId(pInfo.leader().id())
                         .setLeaderEpoch(234)
@@ -650,7 +705,7 @@ public class TopicAdminTest {
                 pms.add(pm);
             }
             MetadataResponseTopic tm = new MetadataResponseTopic()
-                    .setErrorCode(error.code())
+                    .setErrorCode(topicError.code())
                     .setName(topic)
                     .setIsInternal(false)
                     .setPartitions(pms);

Reply via email to