This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8dd697b05f3 KAFKA-14732: Use an exponential backoff retry mechanism
while reconfiguring connector tasks (#13276)
8dd697b05f3 is described below
commit 8dd697b05f36c319bd6c50d6e499168aac172e02
Author: Yash Mayya <[email protected]>
AuthorDate: Tue Feb 28 21:06:34 2023 +0530
KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring
connector tasks (#13276)
Reviewers: Chaitanya Mukka <[email protected]>, Chris Egerton
<[email protected]>
---
.../runtime/distributed/DistributedHerder.java | 49 +++++--
.../runtime/distributed/DistributedHerderTest.java | 155 ++++++++++++++++++++-
2 files changed, 191 insertions(+), 13 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index b68cc443ea7..8a8d8dd95df 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
@@ -147,7 +148,8 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
private static final long FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS =
TimeUnit.SECONDS.toMillis(10);
private static final long START_AND_STOP_SHUTDOWN_TIMEOUT_MS =
TimeUnit.SECONDS.toMillis(1);
- private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
+ private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_INITIAL_MS =
250;
+ private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS =
60000;
private static final long CONFIG_TOPIC_WRITE_PRIVILEGES_BACKOFF_MS = 250;
private static final int START_STOP_THREAD_POOL_SIZE = 8;
private static final short BACKOFF_RETRIES = 5;
@@ -251,7 +253,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
List<String> restNamespace,
AutoCloseable... uponShutdown) {
this(config, worker, worker.workerId(), kafkaClusterId,
statusBackingStore, configBackingStore, null, restUrl, restClient,
worker.metrics(),
- time, connectorClientConfigOverridePolicy, restNamespace,
uponShutdown);
+ time, connectorClientConfigOverridePolicy, restNamespace,
null, uponShutdown);
configBackingStore.setUpdateListener(new ConfigUpdateListener());
}
@@ -269,6 +271,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
Time time,
ConnectorClientConfigOverridePolicy
connectorClientConfigOverridePolicy,
List<String> restNamespace,
+ ExecutorService forwardRequestExecutor,
AutoCloseable... uponShutdown) {
super(worker, workerId, kafkaClusterId, statusBackingStore,
configBackingStore, connectorClientConfigOverridePolicy);
@@ -302,9 +305,12 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
ThreadUtils.createThreadFactory(
this.getClass().getSimpleName() + "-" + clientId +
"-%d", false));
- this.forwardRequestExecutor = Executors.newFixedThreadPool(1,
- ThreadUtils.createThreadFactory(
- "ForwardRequestExecutor-" + clientId + "-%d", false));
+ this.forwardRequestExecutor = forwardRequestExecutor != null
+ ? forwardRequestExecutor
+ : Executors.newFixedThreadPool(
+ 1,
+
ThreadUtils.createThreadFactory("ForwardRequestExecutor-" + clientId + "-%d",
false)
+ );
this.startAndStopExecutor =
Executors.newFixedThreadPool(START_STOP_THREAD_POOL_SIZE,
ThreadUtils.createThreadFactory(
"StartAndStopExecutor-" + clientId + "-%d", false));
@@ -1870,7 +1876,34 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
};
}
+ /**
+ * Request task configs from the connector and write them to the config
storage in case the configs are detected to
+ * have changed. This method retries infinitely with exponential backoff
in case of any errors. The initial backoff
+ * used is {@link #RECONFIGURE_CONNECTOR_TASKS_BACKOFF_INITIAL_MS} with a
multiplier of 2 and a maximum backoff of
+ * {@link #RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS}
+ *
+ * @param initialRequestTime the time in milliseconds when the original
request was made (i.e. before any retries)
+ * @param connName the name of the connector
+ */
private void reconfigureConnectorTasksWithRetry(long initialRequestTime,
final String connName) {
+ ExponentialBackoff exponentialBackoff = new ExponentialBackoff(
+ RECONFIGURE_CONNECTOR_TASKS_BACKOFF_INITIAL_MS,
+ 2, RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS,
+ 0);
+
+
reconfigureConnectorTasksWithExponentialBackoffRetries(initialRequestTime,
connName, exponentialBackoff, 0);
+ }
+
+ /**
+ * Request task configs from the connector and write them to the config
storage in case the configs are detected to
+ * have changed. This method retries infinitely with exponential backoff
in case of any errors.
+ *
+ * @param initialRequestTime the time in milliseconds when the original
request was made (i.e. before any retries)
+ * @param connName the name of the connector
+ * @param exponentialBackoff {@link ExponentialBackoff} used to calculate
the retry backoff duration
+ * @param attempts the number of retry attempts that have been made
+ */
+ private void reconfigureConnectorTasksWithExponentialBackoffRetries(long
initialRequestTime, final String connName, ExponentialBackoff
exponentialBackoff, int attempts) {
reconfigureConnector(connName, (error, result) -> {
// If we encountered an error, we don't have much choice but to
just retry. If we don't, we could get
// stuck with a connector that thinks it has generated tasks, but
wasn't actually successful and therefore
@@ -1880,11 +1913,11 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
if (isPossibleExpiredKeyException(initialRequestTime, error)) {
log.debug("Failed to reconfigure connector's tasks ({}),
possibly due to expired session key. Retrying after backoff", connName);
} else {
- log.error("Failed to reconfigure connector's tasks ({}),
retrying after backoff:", connName, error);
+ log.error("Failed to reconfigure connector's tasks ({}),
retrying after backoff.", connName, error);
}
- addRequest(RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS,
+ addRequest(exponentialBackoff.backoff(attempts),
() -> {
- reconfigureConnectorTasksWithRetry(initialRequestTime,
connName);
+
reconfigureConnectorTasksWithExponentialBackoffRetries(initialRequestTime,
connName, exponentialBackoff, attempts + 1);
return null;
}, (err, res) -> {
if (err != null) {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 99dffe17d9d..722be88fb7e 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -80,6 +80,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import javax.crypto.SecretKey;
+import javax.ws.rs.core.HttpHeaders;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -90,15 +91,16 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.RejectedExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -216,8 +218,7 @@ public class DistributedHerderTest {
@Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;
@Mock private Plugins plugins;
@Mock private RestClient restClient;
- private CountDownLatch shutdownCalled = new CountDownLatch(1);
-
+ private final CountDownLatch shutdownCalled = new CountDownLatch(1);
private ConfigBackingStore.UpdateListener configUpdateListener;
private WorkerRebalanceListener rebalanceListener;
private ExecutorService herderExecutor;
@@ -244,7 +245,7 @@ public class DistributedHerderTest {
new String[]{"connectorType", "updateDeletedConnectorStatus",
"updateDeletedTaskStatus", "validateConnectorConfig", "buildRestartPlan",
"recordRestarting"},
new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID,
KAFKA_CLUSTER_ID,
statusBackingStore, configBackingStore, member, MEMBER_URL,
restClient, metrics, time, noneConnectorClientConfigOverridePolicy,
- Collections.emptyList(), new AutoCloseable[]{uponShutdown});
+ Collections.emptyList(), null, new
AutoCloseable[]{uponShutdown});
configUpdateListener = herder.new ConfigUpdateListener();
rebalanceListener = herder.new RebalanceListener(time);
@@ -3694,6 +3695,113 @@ public class DistributedHerderTest {
PowerMock.verifyAll();
}
+ @Test
+ public void
testTaskReconfigurationRetriesWithConnectorTaskConfigsException() {
+ EasyMock.expect(member.memberId()).andStubReturn("leader");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+ expectRebalance(1, Collections.emptyList(), Collections.emptyList(),
true);
+ expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+ // end of initial tick
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ member.wakeup();
+ PowerMock.expectLastCall().anyTimes();
+
+ member.ensureActive();
+ PowerMock.expectLastCall().anyTimes();
+
+ EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+ EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+ SinkConnectorConfig sinkConnectorConfig = new
SinkConnectorConfig(plugins, CONN1_CONFIG);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1,
sinkConnectorConfig))
+ .andThrow(new ConnectException("Failed to generate task
configs")).times(2);
+
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1,
sinkConnectorConfig)).andReturn(TASK_CONFIGS);
+
+ expectAndVerifyTaskReconfigurationRetries();
+ }
+
+ @Test
+ public void
testTaskReconfigurationRetriesWithLeaderRequestForwardingException() {
+ herder = PowerMock.createPartialMock(DistributedHerder.class,
+ new String[]{"connectorType", "updateDeletedConnectorStatus",
"updateDeletedTaskStatus", "validateConnectorConfig", "buildRestartPlan",
"recordRestarting"},
+ new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID,
KAFKA_CLUSTER_ID,
+ statusBackingStore, configBackingStore, member, MEMBER_URL,
restClient, metrics, time, noneConnectorClientConfigOverridePolicy,
+ Collections.emptyList(), new MockSynchronousExecutor(), new
AutoCloseable[]{});
+
+ rebalanceListener = herder.new RebalanceListener(time);
+
+ EasyMock.expect(member.memberId()).andStubReturn("member");
+
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+ expectRebalance(1, Collections.emptyList(), Collections.emptyList(),
false);
+ expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+ // end of initial tick
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ member.wakeup();
+ PowerMock.expectLastCall().anyTimes();
+
+ member.ensureActive();
+ PowerMock.expectLastCall().anyTimes();
+
+ EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+ EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+ SinkConnectorConfig sinkConnectorConfig = new
SinkConnectorConfig(plugins, CONN1_CONFIG);
+
+ List<Map<String, String>> changedTaskConfigs = new
ArrayList<>(TASK_CONFIGS);
+ changedTaskConfigs.add(TASK_CONFIG);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1,
sinkConnectorConfig)).andReturn(changedTaskConfigs).anyTimes();
+
+ EasyMock.expect(restClient.httpRequest(
+ EasyMock.anyString(), EasyMock.eq("POST"),
EasyMock.anyObject(HttpHeaders.class),
+ EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject(SecretKey.class), EasyMock.anyString())
+ ).andThrow(new ConnectException("Request to leader to reconfigure
connector tasks failed")).times(2);
+
+ EasyMock.expect(restClient.httpRequest(
+ EasyMock.anyString(), EasyMock.eq("POST"),
EasyMock.anyObject(HttpHeaders.class),
+ EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject(SecretKey.class), EasyMock.anyString())
+ ).andReturn(null);
+
+ expectAndVerifyTaskReconfigurationRetries();
+ }
+
+ private void expectAndVerifyTaskReconfigurationRetries() {
+ // task reconfiguration herder request with initial retry backoff
+ member.poll(EasyMock.eq(250L));
+ PowerMock.expectLastCall();
+
+ // task reconfiguration herder request with double the initial retry
backoff
+ member.poll(EasyMock.eq(500L));
+ PowerMock.expectLastCall();
+
+ // the third task reconfiguration request is expected to pass; so
expect no more retries (a Long.MAX_VALUE poll
+ // timeout indicates that there is no herder request currently in the
queue)
+ member.poll(EasyMock.eq(Long.MAX_VALUE));
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ // initial tick
+ herder.tick();
+ herder.requestTaskReconfiguration(CONN1);
+ // process the task reconfiguration request in this tick
+ herder.tick();
+ // advance the time by 250ms so that the task reconfiguration request
with initial retry backoff is processed
+ time.sleep(250);
+ herder.tick();
+ // advance the time by 500ms so that the task reconfiguration request
with double the initial retry backoff is processed
+ time.sleep(500);
+ herder.tick();
+
+ PowerMock.verifyAll();
+ }
+
private void expectRebalance(final long offset,
final List<String> assignedConnectors,
final List<ConnectorTaskId> assignedTasks) {
@@ -4013,6 +4121,43 @@ public class DistributedHerderTest {
private abstract class BogusSourceTask extends SourceTask {
}
+ /**
+ * A mock {@link ExecutorService} that runs tasks synchronously on the
same thread as the caller. This mock
+ * implementation can't be "shut down" and it is the responsibility of the
caller to ensure that {@link Runnable}s
+ * submitted via {@link #execute(Runnable)} don't hang indefinitely.
+ */
+ private static class MockSynchronousExecutor extends
AbstractExecutorService {
+ @Override
+ public void execute(Runnable command) {
+ command.run();
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return null;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return false;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return false;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException {
+ return false;
+ }
+ }
+
private DistributedHerder exactlyOnceHerder() {
Map<String, String> config = new HashMap<>(HERDER_CONFIG);
config.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
@@ -4020,7 +4165,7 @@ public class DistributedHerderTest {
new String[]{"connectorType", "updateDeletedConnectorStatus",
"updateDeletedTaskStatus", "validateConnectorConfig"},
new DistributedConfig(config), worker, WORKER_ID,
KAFKA_CLUSTER_ID,
statusBackingStore, configBackingStore, member, MEMBER_URL,
restClient, metrics, time, noneConnectorClientConfigOverridePolicy,
- Collections.emptyList(), new AutoCloseable[0]);
+ Collections.emptyList(), null, new AutoCloseable[0]);
}
}