Repository: kafka Updated Branches: refs/heads/trunk fc89083f8 -> 8911660e2
KAFKA-3674: Ensure connector target state changes propagated to worker Author: Jason Gustafson <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #1341 from hachikuji/KAFKA-3674 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8911660e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8911660e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8911660e Branch: refs/heads/trunk Commit: 8911660e2e7d9553502974393ad1aa04852c2da2 Parents: fc89083 Author: Jason Gustafson <[email protected]> Authored: Mon May 9 00:12:30 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon May 9 00:12:30 2016 -0700 ---------------------------------------------------------------------- .../runtime/distributed/DistributedHerder.java | 22 +- .../storage/KafkaConfigBackingStore.java | 30 ++- .../connect/runtime/WorkerSourceTaskTest.java | 4 +- .../distributed/DistributedHerderTest.java | 204 +++++++++++++++ .../storage/KafkaConfigBackingStoreTest.java | 258 ++++++++++++++++++- 5 files changed, 498 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8911660e/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index a2beff3..afabbeb 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -309,15 +309,21 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } private void processTargetStateChanges(Set<String> connectorTargetStateChanges) { - if (!connectorTargetStateChanges.isEmpty()) { - for (String connector : connectorTargetStateChanges) { - if (worker.connectorNames().contains(connector)) { - TargetState targetState = configState.targetState(connector); - worker.setTargetState(connector, targetState); - if (targetState == TargetState.STARTED) - reconfigureConnectorTasksWithRetry(connector); - } + for (String connector : connectorTargetStateChanges) { + TargetState targetState = configState.targetState(connector); + if (!configState.connectors().contains(connector)) { + log.debug("Received target state change for unknown connector: {}", connector); + continue; } + + // we must propagate the state change to the worker so that the connector's + // tasks can transition to the new target state + worker.setTargetState(connector, targetState); + + // additionally, if the worker is running the connector itself, then we need to + // request reconfiguration to ensure that config changes while paused take effect + if (worker.ownsConnector(connector) && targetState == TargetState.STARTED) + reconfigureConnectorTasksWithRetry(connector); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/8911660e/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 9412e42..a894f31 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -317,8 +317,14 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { @Override public void removeConnectorConfig(String connector) { log.debug("Removing connector configuration for connector {}", connector); - updateConnectorConfig(connector, null); - configLog.send(TARGET_STATE_KEY(connector), null); + try { + configLog.send(CONNECTOR_KEY(connector), null); + configLog.send(TARGET_STATE_KEY(connector), null); + configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("Failed to remove connector configuration from Kafka: ", e); + throw new ConnectException("Error removing connector configuration from Kafka", e); + } } @Override @@ -437,8 +443,19 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { if (record.key().startsWith(TARGET_STATE_PREFIX)) { String connectorName = record.key().substring(TARGET_STATE_PREFIX.length()); + boolean removed = false; synchronized (lock) { - if (value.value() != null) { + if (value.value() == null) { + // When connector configs are removed, we also write tombstones for the target state. + log.debug("Removed target state for connector {} due to null value in topic.", connectorName); + connectorTargetStates.remove(connectorName); + removed = true; + + // If for some reason we still have configs for the connector, add back the default + // STARTED state to ensure each connector always has a valid target state. + if (connectorConfigs.containsKey(connectorName)) + connectorTargetStates.put(connectorName, TargetState.STARTED); + } else { if (!(value.value() instanceof Map)) { log.error("Found target state ({}) in wrong format: {}", record.key(), value.value().getClass()); return; @@ -461,8 +478,11 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { } } - if (!starting) + // Note that we do not notify the update listener if the target state has been removed. + // Instead we depend on the removal callback of the connector config itself to notify the worker. + if (!starting && !removed) updateListener.onConnectorTargetStateChange(connectorName); + } else if (record.key().startsWith(CONNECTOR_PREFIX)) { String connectorName = record.key().substring(CONNECTOR_PREFIX.length()); boolean removed = false; @@ -487,6 +507,8 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { log.debug("Updating configuration for connector " + connectorName + " configuration: " + newConnectorConfig); connectorConfigs.put(connectorName, (Map<String, String>) newConnectorConfig); + // Set the initial state of the connector to STARTED, which ensures that any connectors + // which were created with 0.9 Connect will be initialized in the STARTED state. if (!connectorTargetStates.containsKey(connectorName)) connectorTargetStates.put(connectorName, TargetState.STARTED); } http://git-wip-us.apache.org/repos/asf/kafka/blob/8911660e/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 0d805da..0768781 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -203,7 +203,9 @@ public class WorkerSourceTaskTest extends ThreadedTest { int priorCount = count.get(); Thread.sleep(100); - assertEquals(priorCount, count.get()); + + // since the transition is observed asynchronously, the count could be off by one loop iteration + assertTrue(count.get() - priorCount <= 1); workerTask.stop(); assertTrue(workerTask.awaitStop(1000)); http://git-wip-us.apache.org/repos/asf/kafka/blob/8911660e/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index fbccc55..81e6be8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -130,6 +130,9 @@ public class DistributedHerderTest { private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.<String>emptySet()); + private static final ClusterConfigState SNAPSHOT_PAUSED_CONN1 = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3), + Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.PAUSED), + TASK_CONFIGS_MAP, Collections.<String>emptySet()); private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), Collections.singletonMap(CONN1, TargetState.STARTED), TASK_CONFIGS_MAP, Collections.<String>emptySet()); @@ -747,6 +750,207 @@ public class DistributedHerderTest { } @Test + public void testConnectorPaused() throws Exception { + // ensure that target state changes are propagated to the worker + + EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1)); + + // join + expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + PowerMock.expectLastCall(); + EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // handle the state change + member.wakeup(); + member.ensureActive(); + PowerMock.expectLastCall(); + + EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1); + PowerMock.expectLastCall(); + + EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true); + + worker.setTargetState(CONN1, TargetState.PAUSED); + PowerMock.expectLastCall(); + + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); // join + configUpdateListener.onConnectorTargetStateChange(CONN1); // state changes to paused + herder.tick(); // worker should apply the state change + + PowerMock.verifyAll(); + } + + @Test + public void testConnectorResumed() throws Exception { + EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1)); + + // start with the connector paused + expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); + expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1); + worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(), + EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED)); + PowerMock.expectLastCall(); + + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // handle the state change + member.wakeup(); + member.ensureActive(); + PowerMock.expectLastCall(); + + EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); + PowerMock.expectLastCall(); + + // we expect reconfiguration after resuming + EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true); + EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + + worker.setTargetState(CONN1, TargetState.STARTED); + PowerMock.expectLastCall(); + + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); // join + configUpdateListener.onConnectorTargetStateChange(CONN1); // state changes to started + herder.tick(); // apply state change + + PowerMock.verifyAll(); + } + + @Test + public void testUnknownConnectorPaused() throws Exception { + EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1)); + + // join + expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0)); + expectPostRebalanceCatchup(SNAPSHOT); + worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // state change is ignored since we have no target state + member.wakeup(); + member.ensureActive(); + PowerMock.expectLastCall(); + + EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); + PowerMock.expectLastCall(); + + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); // join + configUpdateListener.onConnectorTargetStateChange("unknown-connector"); + herder.tick(); // continue + + PowerMock.verifyAll(); + } + + @Test + public void testConnectorPausedRunningTaskOnly() throws Exception { + // even if we don't own the connector, we should still propagate target state + // changes to the worker so that tasks will transition correctly + + EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.<String>emptySet()); + + // join + expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0)); + expectPostRebalanceCatchup(SNAPSHOT); + worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // handle the state change + member.wakeup(); + member.ensureActive(); + PowerMock.expectLastCall(); + + EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1); + PowerMock.expectLastCall(); + + EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false); + + worker.setTargetState(CONN1, TargetState.PAUSED); + PowerMock.expectLastCall(); + + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); // join + configUpdateListener.onConnectorTargetStateChange(CONN1); // state changes to paused + herder.tick(); // apply state change + + PowerMock.verifyAll(); + } + + @Test + public void testConnectorResumedRunningTaskOnly() throws Exception { + // even if we don't own the connector, we should still propagate target state + // changes to the worker so that tasks will transition correctly + + EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.<String>emptySet()); + + // join + expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0)); + expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1); + worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED)); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // handle the state change + member.wakeup(); + member.ensureActive(); + PowerMock.expectLastCall(); + + EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); + PowerMock.expectLastCall(); + + EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false); + + worker.setTargetState(CONN1, TargetState.STARTED); + PowerMock.expectLastCall(); + + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); // join + configUpdateListener.onConnectorTargetStateChange(CONN1); // state changes to paused + herder.tick(); // apply state change + + PowerMock.verifyAll(); + } + + @Test public void testTaskConfigAdded() { // Task config always requires rebalance EasyMock.expect(member.memberId()).andStubReturn("member"); http://git-wip-us.apache.org/repos/asf/kafka/blob/8911660e/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index 617177e..f5bce8f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.util.Callback; @@ -53,9 +54,11 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; @RunWith(PowerMockRunner.class) @PrepareForTest(KafkaConfigBackingStore.class) @@ -107,6 +110,7 @@ public class KafkaConfigBackingStoreTest { new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)), new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)) ); + private static final Struct TARGET_STATE_PAUSED = new Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "PAUSED"); private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2); @@ -181,15 +185,10 @@ public class KafkaConfigBackingStoreTest { EasyMock.expectLastCall(); // Config deletion - expectConvertWriteAndRead( - CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, null, null, null); + expectConnectorRemoval(CONNECTOR_CONFIG_KEYS.get(1), TARGET_STATE_KEYS.get(1)); configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(1)); EasyMock.expectLastCall(); - // Target state deletion - storeLog.send(TARGET_STATE_KEYS.get(1), null); - PowerMock.expectLastCall(); - expectStop(); PowerMock.replayAll(); @@ -220,9 +219,10 @@ public class KafkaConfigBackingStoreTest { // Deletion should remove the second one we added configStorage.removeConnectorConfig(CONNECTOR_IDS.get(1)); configState = configStorage.snapshot(); - assertEquals(3, configState.offset()); + assertEquals(4, configState.offset()); assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0))); assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1))); + assertNull(configState.targetState(CONNECTOR_IDS.get(1))); configStorage.stop(); @@ -346,6 +346,176 @@ public class KafkaConfigBackingStoreTest { } @Test + public void testRestoreTargetState() throws Exception { + expectConfigure(); + List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), + new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3)), + new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4))); + LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap(); + deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(3), TARGET_STATE_PAUSED); + deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + logOffset = 5; + + expectStart(existingRecords, deserialized); + + // Shouldn't see any callbacks since this is during startup + + expectStop(); + + PowerMock.replayAll(); + + configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.start(); + + // Should see a single connector with initial state paused + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(5, configState.offset()); // Should always be next to be read, even if uncommitted + assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + assertEquals(TargetState.PAUSED, configState.targetState(CONNECTOR_IDS.get(0))); + + configStorage.stop(); + + PowerMock.verifyAll(); + } + + @Test + public void testBackgroundUpdateTargetState() throws Exception { + // verify that we handle target state changes correctly when they come up through the log + + expectConfigure(); + List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), + new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3))); + LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap(); + deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + logOffset = 5; + + expectStart(existingRecords, deserialized); + + expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0), TARGET_STATE_PAUSED); + + configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0)); + EasyMock.expectLastCall(); + + expectStop(); + + PowerMock.replayAll(); + + configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.start(); + + // Should see a single connector with initial state paused + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0))); + + configStorage.refresh(0, TimeUnit.SECONDS); + + configStorage.stop(); + + PowerMock.verifyAll(); + } + + @Test + public void testBackgroundConnectorDeletion() throws Exception { + // verify that we handle connector deletions correctly when they come up through the log + + expectConfigure(); + List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), + new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3))); + LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap(); + deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + logOffset = 5; + + expectStart(existingRecords, deserialized); + + LinkedHashMap<String, byte[]> serializedData = new LinkedHashMap<>(); + serializedData.put(CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)); + serializedData.put(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(1)); + + Map<String, Struct> deserializedData = new HashMap<>(); + deserializedData.put(CONNECTOR_CONFIG_KEYS.get(0), null); + deserializedData.put(TARGET_STATE_KEYS.get(0), null); + + expectRead(serializedData, deserializedData); + + configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(0)); + EasyMock.expectLastCall(); + + expectStop(); + + PowerMock.replayAll(); + + configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.start(); + + // Should see a single connector with initial state paused + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0))); + + configStorage.refresh(0, TimeUnit.SECONDS); + + configStorage.stop(); + + PowerMock.verifyAll(); + } + + @Test + public void testRestoreTargetStateUnexpectedDeletion() throws Exception { + expectConfigure(); + List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), + new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(3)), + new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4))); + LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap(); + deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(3), null); + deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + logOffset = 5; + + expectStart(existingRecords, deserialized); + + // Shouldn't see any callbacks since this is during startup + + expectStop(); + + PowerMock.replayAll(); + + configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.start(); + + // The target state deletion should reset the state to STARTED + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(5, configState.offset()); // Should always be next to be read, even if uncommitted + assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0))); + + configStorage.stop(); + + PowerMock.verifyAll(); + } + + @Test public void testRestore() throws Exception { // Restoring data should notify only of the latest values after loading is complete. This also validates // that inconsistent state is ignored. @@ -385,6 +555,7 @@ public class KafkaConfigBackingStoreTest { ClusterConfigState configState = configStorage.snapshot(); assertEquals(7, configState.offset()); // Should always be next to be read, even if uncommitted assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0))); // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2] assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0))); // Should see 2 tasks for that connector. Only config updates before the root key update should be reflected @@ -400,6 +571,51 @@ public class KafkaConfigBackingStoreTest { } @Test + public void testRestoreConnectorDeletion() throws Exception { + // Restoring data should notify only of the latest values after loading is complete. This also validates + // that inconsistent state is ignored. + + expectConfigure(); + // Overwrite each type at least once to ensure we see the latest data after loading + List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), + new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)), + new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(4)), + new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5))); + + LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap(); + deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(3), null); + deserialized.put(CONFIGS_SERIALIZED.get(4), null); + deserialized.put(CONFIGS_SERIALIZED.get(5), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + + logOffset = 6; + expectStart(existingRecords, deserialized); + + // Shouldn't see any callbacks since this is during startup + + expectStop(); + + PowerMock.replayAll(); + + configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.start(); + + // Should see a single connector and its config should be the last one seen anywhere in the log + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(6, configState.offset()); // Should always be next to be read, even if uncommitted + assertTrue(configState.connectors().isEmpty()); + + configStorage.stop(); + + PowerMock.verifyAll(); + } + + @Test public void testRestoreZeroTasks() throws Exception { // Restoring data should notify only of the latest values after loading is complete. This also validates // that inconsistent state is ignored. @@ -558,6 +774,22 @@ public class KafkaConfigBackingStoreTest { PowerMock.expectLastCall(); } + private void expectRead(LinkedHashMap<String, byte[]> serializedValues, + Map<String, Struct> deserializedValues) { + expectReadToEnd(serializedValues); + for (Map.Entry<String, Struct> deserializedValueEntry : deserializedValues.entrySet()) { + byte[] serializedValue = serializedValues.get(deserializedValueEntry.getKey()); + EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(serializedValue))) + .andReturn(new SchemaAndValue(null, structToMap(deserializedValueEntry.getValue()))); + } + } + + private void expectRead(final String key, final byte[] serializedValue, Struct deserializedValue) { + LinkedHashMap<String, byte[]> serializedData = new LinkedHashMap<>(); + serializedData.put(key, serializedValue); + expectRead(serializedData, Collections.singletonMap(key, deserializedValue)); + } + // Expect a conversion & write to the underlying log, followed by a subsequent read when the data is consumed back // from the log. Validate the data that is captured when the conversion is performed matches the specified data // (by checking a single field's value) @@ -596,6 +828,15 @@ public class KafkaConfigBackingStoreTest { }); } + private void expectConnectorRemoval(String configKey, String targetStateKey) { + expectConvertWriteRead(configKey, KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, null, null, null); + expectConvertWriteRead(targetStateKey, KafkaConfigBackingStore.TARGET_STATE_V0, null, null, null); + + LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>(); + recordsToRead.put(configKey, null); + recordsToRead.put(targetStateKey, null); + expectReadToEnd(recordsToRead); + } private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema, final byte[] serialized, final String dataFieldName, final Object dataFieldValue) { @@ -619,6 +860,9 @@ public class KafkaConfigBackingStoreTest { // Generates a Map representation of Struct. Only does shallow traversal, so nested structs are not converted private Map<String, Object> structToMap(Struct struct) { + if (struct == null) + return null; + HashMap<String, Object> result = new HashMap<>(); for (Field field : struct.schema().fields()) result.put(field.name(), struct.get(field));
