This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 09313b5 KAFKA-8449: Restart tasks on reconfiguration under
incremental cooperative rebalancing (#6850)
09313b5 is described below
commit 09313b5fda6ecf629a29d2e8285c6b21c91ddff2
Author: Konstantine Karantasis <[email protected]>
AuthorDate: Mon Jun 3 09:13:40 2019 -0700
KAFKA-8449: Restart tasks on reconfiguration under incremental cooperative
rebalancing (#6850)
Restart task on reconfiguration under incremental cooperative rebalancing,
and keep execution paths separate for config updates between eager and
cooperative. Include the group generation in the log message when the worker
receives its assignment.
Author: Konstantine Karantasis <[email protected]>
Reviewer: Randall Hauch <[email protected]>
---
.../runtime/distributed/DistributedHerder.java | 203 ++++++++++++++++-----
.../integration/MonitorableSourceConnector.java | 5 +-
.../RebalanceSourceConnectorsIntegrationTest.java | 63 ++++++-
3 files changed, 220 insertions(+), 51 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 585836e..52709f7 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -80,6 +80,8 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import static
org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
@@ -141,6 +143,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
// and the from other nodes are safe to process
private boolean rebalanceResolved;
private ExtendedAssignment runningAssignment = ExtendedAssignment.empty();
+ private Set<ConnectorTaskId> tasksToRestart = new HashSet<>();
private ExtendedAssignment assignment;
private boolean canReadConfigs;
private ClusterConfigState configState;
@@ -151,6 +154,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
// Config updates can be collected and applied together when possible.
Also, we need to take care to rebalance when
// needed (e.g. task reconfiguration, which requires everyone to
coordinate offset commits).
private Set<String> connectorConfigUpdates = new HashSet<>();
+ private Set<ConnectorTaskId> taskConfigUpdates = new HashSet<>();
// Similarly collect target state changes (when observed by the config
storage listener) for handling in the
// herder's main thread.
private Set<String> connectorTargetStateChanges = new HashSet<>();
@@ -304,51 +308,47 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
}
// Process any configuration updates
- Set<String> connectorConfigUpdatesCopy = null;
- Set<String> connectorTargetStateChangesCopy = null;
- synchronized (this) {
- if (needsReconfigRebalance || !connectorConfigUpdates.isEmpty() ||
!connectorTargetStateChanges.isEmpty()) {
- // Connector reconfigs only need local updates since there is
no coordination between workers required.
- // However, if connectors were added or removed, work needs to
be rebalanced since we have more work
- // items to distribute among workers.
- configState = configBackingStore.snapshot();
-
- if (needsReconfigRebalance) {
- // Task reconfigs require a rebalance. Request the
rebalance, clean out state, and then restart
- // this loop, which will then ensure the rebalance occurs
without any other requests being
- // processed until it completes.
- member.requestRejoin();
- // Any connector config updates or target state changes
will be addressed during the rebalance too
- connectorConfigUpdates.clear();
- connectorTargetStateChanges.clear();
- needsReconfigRebalance = false;
- log.debug("Requesting rebalance due to reconfiguration of
tasks (needsReconfigRebalance: {})",
- needsReconfigRebalance);
- return;
- } else {
- if (!connectorConfigUpdates.isEmpty()) {
- // We can't start/stop while locked since starting
connectors can cause task updates that will
- // require writing configs, which in turn make
callbacks into this class from another thread that
- // require acquiring a lock. This leads to deadlock.
Instead, just copy the info we need and process
- // the updates after unlocking.
- connectorConfigUpdatesCopy = connectorConfigUpdates;
- connectorConfigUpdates = new HashSet<>();
- }
+ AtomicReference<Set<String>> connectorConfigUpdatesCopy = new
AtomicReference<>();
+ AtomicReference<Set<String>> connectorTargetStateChangesCopy = new
AtomicReference<>();
+ AtomicReference<Set<ConnectorTaskId>> taskConfigUpdatesCopy = new
AtomicReference<>();
+
+ boolean shouldReturn;
+ if (member.currentProtocolVersion() == CONNECT_PROTOCOL_V0) {
+ shouldReturn = updateConfigsWithEager(connectorConfigUpdatesCopy,
+ connectorTargetStateChangesCopy);
+ // With eager protocol we should return immediately if
needsReconfigRebalance has
+ // been set to retain the old workflow
+ if (shouldReturn) {
+ return;
+ }
- if (!connectorTargetStateChanges.isEmpty()) {
- // Similarly for target state changes which can cause
connectors to be restarted
- connectorTargetStateChangesCopy =
connectorTargetStateChanges;
- connectorTargetStateChanges = new HashSet<>();
- }
- }
+ if (connectorConfigUpdatesCopy.get() != null) {
+
processConnectorConfigUpdates(connectorConfigUpdatesCopy.get());
}
- }
- if (connectorConfigUpdatesCopy != null)
- processConnectorConfigUpdates(connectorConfigUpdatesCopy);
+ if (connectorTargetStateChangesCopy.get() != null) {
+
processTargetStateChanges(connectorTargetStateChangesCopy.get());
+ }
+ } else {
+ shouldReturn =
updateConfigsWithIncrementalCooperative(connectorConfigUpdatesCopy,
+ connectorTargetStateChangesCopy, taskConfigUpdatesCopy);
+
+ if (connectorConfigUpdatesCopy.get() != null) {
+
processConnectorConfigUpdates(connectorConfigUpdatesCopy.get());
+ }
+
+ if (connectorTargetStateChangesCopy.get() != null) {
+
processTargetStateChanges(connectorTargetStateChangesCopy.get());
+ }
- if (connectorTargetStateChangesCopy != null)
- processTargetStateChanges(connectorTargetStateChangesCopy);
+ if (taskConfigUpdatesCopy.get() != null) {
+
processTaskConfigUpdatesWithIncrementalCooperative(taskConfigUpdatesCopy.get());
+ }
+
+ if (shouldReturn) {
+ return;
+ }
+ }
// Let the group take any actions it needs to
try {
@@ -360,6 +360,95 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
}
}
+ private synchronized boolean
updateConfigsWithEager(AtomicReference<Set<String>> connectorConfigUpdatesCopy,
+
AtomicReference<Set<String>> connectorTargetStateChangesCopy) {
+ // This branch is here to avoid creating a snapshot if not needed
+ if (needsReconfigRebalance
+ || !connectorConfigUpdates.isEmpty()
+ || !connectorTargetStateChanges.isEmpty()) {
+ // Connector reconfigs only need local updates since there is no
coordination between workers required.
+ // However, if connectors were added or removed, work needs to be
rebalanced since we have more work
+ // items to distribute among workers.
+ configState = configBackingStore.snapshot();
+
+ if (needsReconfigRebalance) {
+ // Task reconfigs require a rebalance. Request the rebalance,
clean out state, and then restart
+ // this loop, which will then ensure the rebalance occurs
without any other requests being
+ // processed until it completes.
+ log.debug("Requesting rebalance due to reconfiguration of
tasks (needsReconfigRebalance: {})",
+ needsReconfigRebalance);
+ member.requestRejoin();
+ needsReconfigRebalance = false;
+ // Any connector config updates or target state changes will
be addressed during the rebalance too
+ connectorConfigUpdates.clear();
+ connectorTargetStateChanges.clear();
+ return true;
+ } else {
+ if (!connectorConfigUpdates.isEmpty()) {
+ // We can't start/stop while locked since starting
connectors can cause task updates that will
+ // require writing configs, which in turn make callbacks
into this class from another thread that
+ // require acquiring a lock. This leads to deadlock.
Instead, just copy the info we need and process
+ // the updates after unlocking.
+ connectorConfigUpdatesCopy.set(connectorConfigUpdates);
+ connectorConfigUpdates = new HashSet<>();
+ }
+
+ if (!connectorTargetStateChanges.isEmpty()) {
+ // Similarly for target state changes which can cause
connectors to be restarted
+
connectorTargetStateChangesCopy.set(connectorTargetStateChanges);
+ connectorTargetStateChanges = new HashSet<>();
+ }
+ }
+ }
+ return false;
+ }
+
+ private synchronized boolean
updateConfigsWithIncrementalCooperative(AtomicReference<Set<String>>
connectorConfigUpdatesCopy,
+
AtomicReference<Set<String>> connectorTargetStateChangesCopy,
+
AtomicReference<Set<ConnectorTaskId>> taskConfigUpdatesCopy) {
+ boolean retValue = false;
+ // This branch is here to avoid creating a snapshot if not needed
+ if (needsReconfigRebalance
+ || !connectorConfigUpdates.isEmpty()
+ || !connectorTargetStateChanges.isEmpty()
+ || !taskConfigUpdates.isEmpty()) {
+ // Connector reconfigs only need local updates since there is no
coordination between workers required.
+ // However, if connectors were added or removed, work needs to be
rebalanced since we have more work
+ // items to distribute among workers.
+ configState = configBackingStore.snapshot();
+
+ if (needsReconfigRebalance) {
+ log.debug("Requesting rebalance due to reconfiguration of
tasks (needsReconfigRebalance: {})",
+ needsReconfigRebalance);
+ member.requestRejoin();
+ needsReconfigRebalance = false;
+ retValue = true;
+ }
+
+ if (!connectorConfigUpdates.isEmpty()) {
+ // We can't start/stop while locked since starting connectors
can cause task updates that will
+ // require writing configs, which in turn make callbacks into
this class from another thread that
+ // require acquiring a lock. This leads to deadlock. Instead,
just copy the info we need and process
+ // the updates after unlocking.
+ connectorConfigUpdatesCopy.set(connectorConfigUpdates);
+ connectorConfigUpdates = new HashSet<>();
+ }
+
+ if (!connectorTargetStateChanges.isEmpty()) {
+ // Similarly for target state changes which can cause
connectors to be restarted
+
connectorTargetStateChangesCopy.set(connectorTargetStateChanges);
+ connectorTargetStateChanges = new HashSet<>();
+ }
+
+ if (!taskConfigUpdates.isEmpty()) {
+ // Similarly for task config updates
+ taskConfigUpdatesCopy.set(taskConfigUpdates);
+ taskConfigUpdates = new HashSet<>();
+ }
+ }
+ return retValue;
+ }
+
private void processConnectorConfigUpdates(Set<String>
connectorConfigUpdates) {
// If we only have connector config updates, we can just bounce the
updated connectors that are
// currently assigned to this worker.
@@ -396,6 +485,21 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
}
}
+ private void
processTaskConfigUpdatesWithIncrementalCooperative(Set<ConnectorTaskId>
taskConfigUpdates) {
+ Set<ConnectorTaskId> localTasks = assignment == null
+ ? Collections.emptySet()
+ : new HashSet<>(assignment.tasks());
+ Set<String> connectorsWhoseTasksToStop = taskConfigUpdates.stream()
+ .map(ConnectorTaskId::connector).collect(Collectors.toSet());
+
+ List<ConnectorTaskId> tasksToStop = localTasks.stream()
+ .filter(taskId ->
connectorsWhoseTasksToStop.contains(taskId.connector()))
+ .collect(Collectors.toList());
+ log.info("Handling task config update by restarting tasks {}",
tasksToStop);
+ worker.stopAndAwaitTasks(tasksToStop);
+ tasksToRestart.addAll(tasksToStop);
+ }
+
// public for testing
public void halt() {
synchronized (this) {
@@ -900,6 +1004,12 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
callables.add(getConnectorStartingCallable(connectorName));
}
+ // These tasks have been stopped by this worker due to task
reconfiguration. In order to
+ // restart them, they are removed just before the overall task startup
from the set of
+ // currently running tasks. Therefore, they'll be restarted only if
they are included in
+ // the assignment that was just received after rebalancing.
+ runningAssignment.tasks().removeAll(tasksToRestart);
+ tasksToRestart.clear();
for (ConnectorTaskId taskId : assignmentDifference(assignment.tasks(),
runningAssignment.tasks())) {
callables.add(getTaskStartingCallable(taskId));
}
@@ -1172,12 +1282,17 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
public void onTaskConfigUpdate(Collection<ConnectorTaskId> tasks) {
log.info("Tasks {} configs updated", tasks);
- // Stage the update and wake up the work thread. No need to record
the set of tasks here because task reconfigs
- // always need a rebalance to ensure offsets get committed.
+ // Stage the update and wake up the work thread.
+ // The set of tasks is recorder for incremental cooperative
rebalancing, in which
+ // tasks don't get restarted unless they are balanced between
workers.
+ // With eager rebalancing there's no need to record the set of
tasks because task reconfigs
+ // always need a rebalance to ensure offsets get committed. In
eager rebalancing the
+ // recorded set of tasks remains unused.
// TODO: As an optimization, some task config updates could avoid
a rebalance. In particular, single-task
// connectors clearly don't need any coordination.
synchronized (DistributedHerder.this) {
needsReconfigRebalance = true;
+ taskConfigUpdates.addAll(tasks);
}
member.wakeup();
}
@@ -1279,7 +1394,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
// catch up (or backoff if we fail) not executed in a callback,
and so we'll be able to invoke other
// group membership actions (e.g., we may need to explicitly leave
the group if we cannot handle the
// assigned tasks).
- log.info("Joined group and got assignment: {}", assignment);
+ log.info("Joined group at generation {} and got assignment: {}",
generation, assignment);
synchronized (DistributedHerder.this) {
DistributedHerder.this.assignment = assignment;
DistributedHerder.this.generation = generation;
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
index 8bc8953..2ca7698 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
@@ -44,6 +44,7 @@ import java.util.stream.LongStream;
public class MonitorableSourceConnector extends TestSourceConnector {
private static final Logger log =
LoggerFactory.getLogger(MonitorableSourceConnector.class);
+ public static final String TOPIC_CONFIG = "topic";
private String connectorName;
private ConnectorHandle connectorHandle;
private Map<String, String> commonConfigs;
@@ -105,7 +106,7 @@ public class MonitorableSourceConnector extends
TestSourceConnector {
public void start(Map<String, String> props) {
taskId = props.get("task.id");
connectorName = props.get("connector.name");
- topicName = props.getOrDefault("topic", "sequential-topic");
+ topicName = props.getOrDefault(TOPIC_CONFIG, "sequential-topic");
throughput = Long.valueOf(props.getOrDefault("throughput", "-1"));
batchSize =
Integer.valueOf(props.getOrDefault("messages.per.poll", "1"));
taskHandle =
RuntimeHandles.get().connectorHandle(connectorName).taskHandle(taskId);
@@ -113,7 +114,7 @@ public class MonitorableSourceConnector extends
TestSourceConnector {
context.offsetStorageReader().offset(Collections.singletonMap("task.id",
taskId)))
.orElse(Collections.emptyMap());
startingSeqno = Optional.ofNullable((Long)
offset.get("saved")).orElse(0L);
- log.info("Started {} task {}", this.getClass().getSimpleName(),
taskId);
+ log.info("Started {} task {} with properties {}",
this.getClass().getSimpleName(), taskId, props);
throttler = new ThroughputThrottler(throughput,
System.currentTimeMillis());
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index b0125b2..d3cc8db 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -39,17 +39,18 @@ import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static
org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
-import static
org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static
org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
import static
org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.COMPATIBLE;
import static
org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONNECT_PROTOCOL_CONFIG;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
/**
* Integration tests for incremental cooperative rebalancing between Connect
workers
@@ -109,7 +110,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
props.put("throughput", String.valueOf(1));
props.put("messages.per.poll", String.valueOf(10));
- props.put(TOPICS_CONFIG, TOPIC_NAME);
+ props.put(TOPIC_CONFIG, TOPIC_NAME);
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG,
StringConverter.class.getName());
@@ -130,6 +131,58 @@ public class RebalanceSourceConnectorsIntegrationTest {
}
@Test
+ public void testReconfigConnector() throws Exception {
+ ConnectorHandle connectorHandle =
RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+
+ // create test topic
+ String anotherTopic = "another-topic";
+ connect.kafka().createTopic(TOPIC_NAME, NUM_TOPIC_PARTITIONS);
+ connect.kafka().createTopic(anotherTopic, NUM_TOPIC_PARTITIONS);
+
+ // setup up props for the source connector
+ Map<String, String> props = new HashMap<>();
+ props.put(CONNECTOR_CLASS_CONFIG,
MonitorableSourceConnector.class.getSimpleName());
+ props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+ props.put("throughput", String.valueOf(1));
+ props.put("messages.per.poll", String.valueOf(10));
+ props.put(TOPIC_CONFIG, TOPIC_NAME);
+ props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(VALUE_CONVERTER_CLASS_CONFIG,
StringConverter.class.getName());
+
+ // start a source connector
+ connect.configureConnector(CONNECTOR_NAME, props);
+
+ waitForCondition(() ->
this.assertConnectorAndTasksRunning(CONNECTOR_NAME, NUM_TASKS).orElse(false),
+ CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in
time.");
+
+ int numRecordsProduced = 100;
+ int recordTransferDurationMs = 5000;
+
+ // consume all records from the source topic or fail, to ensure that
they were correctly produced
+ int recordNum = connect.kafka().consume(numRecordsProduced,
recordTransferDurationMs, TOPIC_NAME).count();
+ assertTrue("Not enough records produced by source connector. Expected
at least: " + numRecordsProduced + " + but got " + recordNum,
+ recordNum >= numRecordsProduced);
+
+ // Reconfigure the source connector by changing the Kafka topic used
as output
+ props.put(TOPIC_CONFIG, anotherTopic);
+ connect.configureConnector(CONNECTOR_NAME, props);
+
+ waitForCondition(() ->
this.assertConnectorAndTasksRunning(CONNECTOR_NAME, NUM_TASKS).orElse(false),
+ CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in
time.");
+
+ // expect all records to be produced by the connector
+ connectorHandle.expectedRecords(numRecordsProduced);
+
+ // expect all records to be produced by the connector
+ connectorHandle.expectedCommits(numRecordsProduced);
+
+ // consume all records from the source topic or fail, to ensure that
they were correctly produced
+ recordNum = connect.kafka().consume(numRecordsProduced,
recordTransferDurationMs, anotherTopic).count();
+ assertTrue("Not enough records produced by source connector. Expected
at least: " + numRecordsProduced + " + but got " + recordNum,
+ recordNum >= numRecordsProduced);
+ }
+
+ @Test
public void testDeleteConnector() throws Exception {
// create test topic
connect.kafka().createTopic(TOPIC_NAME, NUM_TOPIC_PARTITIONS);
@@ -140,7 +193,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
props.put("throughput", String.valueOf(1));
props.put("messages.per.poll", String.valueOf(10));
- props.put(TOPICS_CONFIG, TOPIC_NAME);
+ props.put(TOPIC_CONFIG, TOPIC_NAME);
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG,
StringConverter.class.getName());
@@ -181,7 +234,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
props.put("throughput", String.valueOf(1));
props.put("messages.per.poll", String.valueOf(10));
- props.put(TOPICS_CONFIG, TOPIC_NAME);
+ props.put(TOPIC_CONFIG, TOPIC_NAME);
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG,
StringConverter.class.getName());
@@ -224,7 +277,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
props.put("throughput", String.valueOf(1));
props.put("messages.per.poll", String.valueOf(10));
- props.put(TOPICS_CONFIG, TOPIC_NAME);
+ props.put(TOPIC_CONFIG, TOPIC_NAME);
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG,
StringConverter.class.getName());