This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8dd697b05f3 KAFKA-14732: Use an exponential backoff retry mechanism 
while reconfiguring connector tasks (#13276)
8dd697b05f3 is described below

commit 8dd697b05f36c319bd6c50d6e499168aac172e02
Author: Yash Mayya <[email protected]>
AuthorDate: Tue Feb 28 21:06:34 2023 +0530

    KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring 
connector tasks (#13276)
    
    Reviewers: Chaitanya Mukka <[email protected]>, Chris Egerton 
<[email protected]>
---
 .../runtime/distributed/DistributedHerder.java     |  49 +++++--
 .../runtime/distributed/DistributedHerderTest.java | 155 ++++++++++++++++++++-
 2 files changed, 191 insertions(+), 13 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index b68cc443ea7..8a8d8dd95df 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.CumulativeSum;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.ExponentialBackoff;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.common.utils.Time;
@@ -147,7 +148,8 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
 
     private static final long FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(10);
     private static final long START_AND_STOP_SHUTDOWN_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(1);
-    private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
+    private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_INITIAL_MS = 
250;
+    private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS = 
60000;
     private static final long CONFIG_TOPIC_WRITE_PRIVILEGES_BACKOFF_MS = 250;
     private static final int START_STOP_THREAD_POOL_SIZE = 8;
     private static final short BACKOFF_RETRIES = 5;
@@ -251,7 +253,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                              List<String> restNamespace,
                              AutoCloseable... uponShutdown) {
         this(config, worker, worker.workerId(), kafkaClusterId, 
statusBackingStore, configBackingStore, null, restUrl, restClient, 
worker.metrics(),
-                time, connectorClientConfigOverridePolicy, restNamespace, 
uponShutdown);
+                time, connectorClientConfigOverridePolicy, restNamespace, 
null, uponShutdown);
         configBackingStore.setUpdateListener(new ConfigUpdateListener());
     }
 
@@ -269,6 +271,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                       Time time,
                       ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy,
                       List<String> restNamespace,
+                      ExecutorService forwardRequestExecutor,
                       AutoCloseable... uponShutdown) {
         super(worker, workerId, kafkaClusterId, statusBackingStore, 
configBackingStore, connectorClientConfigOverridePolicy);
 
@@ -302,9 +305,12 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                 ThreadUtils.createThreadFactory(
                         this.getClass().getSimpleName() + "-" + clientId + 
"-%d", false));
 
-        this.forwardRequestExecutor = Executors.newFixedThreadPool(1,
-                ThreadUtils.createThreadFactory(
-                        "ForwardRequestExecutor-" + clientId + "-%d", false));
+        this.forwardRequestExecutor = forwardRequestExecutor != null
+                ? forwardRequestExecutor
+                : Executors.newFixedThreadPool(
+                        1,
+                        
ThreadUtils.createThreadFactory("ForwardRequestExecutor-" + clientId + "-%d", 
false)
+                );
         this.startAndStopExecutor = 
Executors.newFixedThreadPool(START_STOP_THREAD_POOL_SIZE,
                 ThreadUtils.createThreadFactory(
                         "StartAndStopExecutor-" + clientId + "-%d", false));
@@ -1870,7 +1876,34 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
         };
     }
 
+    /**
+     * Request task configs from the connector and write them to the config 
storage in case the configs are detected to
+     * have changed. This method retries infinitely with exponential backoff 
in case of any errors. The initial backoff
+     * used is {@link #RECONFIGURE_CONNECTOR_TASKS_BACKOFF_INITIAL_MS} with a 
multiplier of 2 and a maximum backoff of
+     * {@link #RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS}
+     *
+     * @param initialRequestTime the time in milliseconds when the original 
request was made (i.e. before any retries)
+     * @param connName the name of the connector
+     */
     private void reconfigureConnectorTasksWithRetry(long initialRequestTime, 
final String connName) {
+        ExponentialBackoff exponentialBackoff = new ExponentialBackoff(
+                RECONFIGURE_CONNECTOR_TASKS_BACKOFF_INITIAL_MS,
+                2, RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS,
+                0);
+
+        
reconfigureConnectorTasksWithExponentialBackoffRetries(initialRequestTime, 
connName, exponentialBackoff, 0);
+    }
+
+    /**
+     * Request task configs from the connector and write them to the config 
storage in case the configs are detected to
+     * have changed. This method retries infinitely with exponential backoff 
in case of any errors.
+     *
+     * @param initialRequestTime the time in milliseconds when the original 
request was made (i.e. before any retries)
+     * @param connName the name of the connector
+     * @param exponentialBackoff {@link ExponentialBackoff} used to calculate 
the retry backoff duration
+     * @param attempts the number of retry attempts that have been made
+     */
+    private void reconfigureConnectorTasksWithExponentialBackoffRetries(long 
initialRequestTime, final String connName, ExponentialBackoff 
exponentialBackoff, int attempts) {
         reconfigureConnector(connName, (error, result) -> {
             // If we encountered an error, we don't have much choice but to 
just retry. If we don't, we could get
             // stuck with a connector that thinks it has generated tasks, but 
wasn't actually successful and therefore
@@ -1880,11 +1913,11 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                 if (isPossibleExpiredKeyException(initialRequestTime, error)) {
                     log.debug("Failed to reconfigure connector's tasks ({}), 
possibly due to expired session key. Retrying after backoff", connName);
                 } else {
-                    log.error("Failed to reconfigure connector's tasks ({}), 
retrying after backoff:", connName, error);
+                    log.error("Failed to reconfigure connector's tasks ({}), 
retrying after backoff.", connName, error);
                 }
-                addRequest(RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS,
+                addRequest(exponentialBackoff.backoff(attempts),
                     () -> {
-                        reconfigureConnectorTasksWithRetry(initialRequestTime, 
connName);
+                        
reconfigureConnectorTasksWithExponentialBackoffRetries(initialRequestTime, 
connName, exponentialBackoff, attempts + 1);
                         return null;
                     }, (err, res) -> {
                         if (err != null) {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 99dffe17d9d..722be88fb7e 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -80,6 +80,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
 
 import javax.crypto.SecretKey;
+import javax.ws.rs.core.HttpHeaders;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -90,15 +91,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.AbstractExecutorService;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -216,8 +218,7 @@ public class DistributedHerderTest {
     @Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;
     @Mock private Plugins plugins;
     @Mock private RestClient restClient;
-    private CountDownLatch shutdownCalled = new CountDownLatch(1);
-
+    private final CountDownLatch shutdownCalled = new CountDownLatch(1);
     private ConfigBackingStore.UpdateListener configUpdateListener;
     private WorkerRebalanceListener rebalanceListener;
     private ExecutorService herderExecutor;
@@ -244,7 +245,7 @@ public class DistributedHerderTest {
                 new String[]{"connectorType", "updateDeletedConnectorStatus", 
"updateDeletedTaskStatus", "validateConnectorConfig", "buildRestartPlan", 
"recordRestarting"},
                 new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, 
KAFKA_CLUSTER_ID,
                 statusBackingStore, configBackingStore, member, MEMBER_URL, 
restClient, metrics, time, noneConnectorClientConfigOverridePolicy,
-                Collections.emptyList(), new AutoCloseable[]{uponShutdown});
+                Collections.emptyList(), null, new 
AutoCloseable[]{uponShutdown});
 
         configUpdateListener = herder.new ConfigUpdateListener();
         rebalanceListener = herder.new RebalanceListener(time);
@@ -3694,6 +3695,113 @@ public class DistributedHerderTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void 
testTaskReconfigurationRetriesWithConnectorTaskConfigsException() {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        // end of initial tick
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        member.wakeup();
+        PowerMock.expectLastCall().anyTimes();
+
+        member.ensureActive();
+        PowerMock.expectLastCall().anyTimes();
+
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+        SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, CONN1_CONFIG);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, 
sinkConnectorConfig))
+                .andThrow(new ConnectException("Failed to generate task 
configs")).times(2);
+
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, 
sinkConnectorConfig)).andReturn(TASK_CONFIGS);
+
+        expectAndVerifyTaskReconfigurationRetries();
+    }
+
+    @Test
+    public void 
testTaskReconfigurationRetriesWithLeaderRequestForwardingException() {
+        herder = PowerMock.createPartialMock(DistributedHerder.class,
+                new String[]{"connectorType", "updateDeletedConnectorStatus", 
"updateDeletedTaskStatus", "validateConnectorConfig", "buildRestartPlan", 
"recordRestarting"},
+                new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, 
KAFKA_CLUSTER_ID,
+                statusBackingStore, configBackingStore, member, MEMBER_URL, 
restClient, metrics, time, noneConnectorClientConfigOverridePolicy,
+                Collections.emptyList(), new MockSynchronousExecutor(), new 
AutoCloseable[]{});
+
+        rebalanceListener = herder.new RebalanceListener(time);
+
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), 
false);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        // end of initial tick
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        member.wakeup();
+        PowerMock.expectLastCall().anyTimes();
+
+        member.ensureActive();
+        PowerMock.expectLastCall().anyTimes();
+
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+        SinkConnectorConfig sinkConnectorConfig = new 
SinkConnectorConfig(plugins, CONN1_CONFIG);
+
+        List<Map<String, String>> changedTaskConfigs = new 
ArrayList<>(TASK_CONFIGS);
+        changedTaskConfigs.add(TASK_CONFIG);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, 
sinkConnectorConfig)).andReturn(changedTaskConfigs).anyTimes();
+
+        EasyMock.expect(restClient.httpRequest(
+                EasyMock.anyString(), EasyMock.eq("POST"), 
EasyMock.anyObject(HttpHeaders.class),
+                EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject(SecretKey.class), EasyMock.anyString())
+        ).andThrow(new ConnectException("Request to leader to reconfigure 
connector tasks failed")).times(2);
+
+        EasyMock.expect(restClient.httpRequest(
+                EasyMock.anyString(), EasyMock.eq("POST"), 
EasyMock.anyObject(HttpHeaders.class),
+                EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject(SecretKey.class), EasyMock.anyString())
+        ).andReturn(null);
+
+        expectAndVerifyTaskReconfigurationRetries();
+    }
+
+    private void expectAndVerifyTaskReconfigurationRetries() {
+        // task reconfiguration herder request with initial retry backoff
+        member.poll(EasyMock.eq(250L));
+        PowerMock.expectLastCall();
+
+        // task reconfiguration herder request with double the initial retry 
backoff
+        member.poll(EasyMock.eq(500L));
+        PowerMock.expectLastCall();
+
+        // the third task reconfiguration request is expected to pass; so 
expect no more retries (a Long.MAX_VALUE poll
+        // timeout indicates that there is no herder request currently in the 
queue)
+        member.poll(EasyMock.eq(Long.MAX_VALUE));
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        // initial tick
+        herder.tick();
+        herder.requestTaskReconfiguration(CONN1);
+        // process the task reconfiguration request in this tick
+        herder.tick();
+        // advance the time by 250ms so that the task reconfiguration request 
with initial retry backoff is processed
+        time.sleep(250);
+        herder.tick();
+        // advance the time by 500ms so that the task reconfiguration request 
with double the initial retry backoff is processed
+        time.sleep(500);
+        herder.tick();
+
+        PowerMock.verifyAll();
+    }
+
     private void expectRebalance(final long offset,
                                  final List<String> assignedConnectors,
                                  final List<ConnectorTaskId> assignedTasks) {
@@ -4013,6 +4121,43 @@ public class DistributedHerderTest {
     private abstract class BogusSourceTask extends SourceTask {
     }
 
+    /**
+     * A mock {@link ExecutorService} that runs tasks synchronously on the 
same thread as the caller. This mock
+     * implementation can't be "shut down" and it is the responsibility of the 
caller to ensure that {@link Runnable}s
+     * submitted via {@link #execute(Runnable)} don't hang indefinitely.
+     */
+    private static class MockSynchronousExecutor extends 
AbstractExecutorService {
+        @Override
+        public void execute(Runnable command) {
+            command.run();
+        }
+
+        @Override
+        public void shutdown() {
+
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            return null;
+        }
+
+        @Override
+        public boolean isShutdown() {
+            return false;
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return false;
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+            return false;
+        }
+    }
+
     private DistributedHerder exactlyOnceHerder() {
         Map<String, String> config = new HashMap<>(HERDER_CONFIG);
         config.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
@@ -4020,7 +4165,7 @@ public class DistributedHerderTest {
                 new String[]{"connectorType", "updateDeletedConnectorStatus", 
"updateDeletedTaskStatus", "validateConnectorConfig"},
                 new DistributedConfig(config), worker, WORKER_ID, 
KAFKA_CLUSTER_ID,
                 statusBackingStore, configBackingStore, member, MEMBER_URL, 
restClient, metrics, time, noneConnectorClientConfigOverridePolicy,
-                Collections.emptyList(), new AutoCloseable[0]);
+                Collections.emptyList(), null, new AutoCloseable[0]);
     }
 
 }

Reply via email to