Repository: kafka
Updated Branches:
  refs/heads/0.10.0 18f643b82 -> 1ee36c940


KAFKA-3674: Ensure connector target state changes propagated to worker

Author: Jason Gustafson <[email protected]>

Reviewers: Ewen Cheslack-Postava <[email protected]>

Closes #1341 from hachikuji/KAFKA-3674

(cherry picked from commit 8911660e2e7d9553502974393ad1aa04852c2da2)
Signed-off-by: Ewen Cheslack-Postava <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1ee36c94
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1ee36c94
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1ee36c94

Branch: refs/heads/0.10.0
Commit: 1ee36c940ee656e575b8a0aa7cc66df380bb0978
Parents: 18f643b
Author: Jason Gustafson <[email protected]>
Authored: Mon May 9 00:12:30 2016 -0700
Committer: Ewen Cheslack-Postava <[email protected]>
Committed: Mon May 9 00:12:56 2016 -0700

----------------------------------------------------------------------
 .../runtime/distributed/DistributedHerder.java  |  22 +-
 .../storage/KafkaConfigBackingStore.java        |  30 ++-
 .../connect/runtime/WorkerSourceTaskTest.java   |   4 +-
 .../distributed/DistributedHerderTest.java      | 204 +++++++++++++++
 .../storage/KafkaConfigBackingStoreTest.java    | 258 ++++++++++++++++++-
 5 files changed, 498 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1ee36c94/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index a2beff3..afabbeb 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -309,15 +309,21 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
     }
 
     private void processTargetStateChanges(Set<String> 
connectorTargetStateChanges) {
-        if (!connectorTargetStateChanges.isEmpty()) {
-            for (String connector : connectorTargetStateChanges) {
-                if (worker.connectorNames().contains(connector)) {
-                    TargetState targetState = 
configState.targetState(connector);
-                    worker.setTargetState(connector, targetState);
-                    if (targetState == TargetState.STARTED)
-                        reconfigureConnectorTasksWithRetry(connector);
-                }
+        for (String connector : connectorTargetStateChanges) {
+            TargetState targetState = configState.targetState(connector);
+            if (!configState.connectors().contains(connector)) {
+                log.debug("Received target state change for unknown connector: 
{}", connector);
+                continue;
             }
+
+            // we must propagate the state change to the worker so that the 
connector's
+            // tasks can transition to the new target state
+            worker.setTargetState(connector, targetState);
+
+            // additionally, if the worker is running the connector itself, 
then we need to
+            // request reconfiguration to ensure that config changes while 
paused take effect
+            if (worker.ownsConnector(connector) && targetState == 
TargetState.STARTED)
+                reconfigureConnectorTasksWithRetry(connector);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ee36c94/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 9412e42..a894f31 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -317,8 +317,14 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
     @Override
     public void removeConnectorConfig(String connector) {
         log.debug("Removing connector configuration for connector {}", 
connector);
-        updateConnectorConfig(connector, null);
-        configLog.send(TARGET_STATE_KEY(connector), null);
+        try {
+            configLog.send(CONNECTOR_KEY(connector), null);
+            configLog.send(TARGET_STATE_KEY(connector), null);
+            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            log.error("Failed to remove connector configuration from Kafka: ", 
e);
+            throw new ConnectException("Error removing connector configuration 
from Kafka", e);
+        }
     }
 
     @Override
@@ -437,8 +443,19 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
 
             if (record.key().startsWith(TARGET_STATE_PREFIX)) {
                 String connectorName = 
record.key().substring(TARGET_STATE_PREFIX.length());
+                boolean removed = false;
                 synchronized (lock) {
-                    if (value.value() != null) {
+                    if (value.value() == null) {
+                        // When connector configs are removed, we also write 
tombstones for the target state.
+                        log.debug("Removed target state for connector {} due 
to null value in topic.", connectorName);
+                        connectorTargetStates.remove(connectorName);
+                        removed = true;
+
+                        // If for some reason we still have configs for the 
connector, add back the default
+                        // STARTED state to ensure each connector always has a 
valid target state.
+                        if (connectorConfigs.containsKey(connectorName))
+                            connectorTargetStates.put(connectorName, 
TargetState.STARTED);
+                    } else {
                         if (!(value.value() instanceof Map)) {
                             log.error("Found target state ({}) in wrong 
format: {}",  record.key(), value.value().getClass());
                             return;
@@ -461,8 +478,11 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
                     }
                 }
 
-                if (!starting)
+                // Note that we do not notify the update listener if the 
target state has been removed.
+                // Instead we depend on the removal callback of the connector 
config itself to notify the worker.
+                if (!starting && !removed)
                     updateListener.onConnectorTargetStateChange(connectorName);
+
             } else if (record.key().startsWith(CONNECTOR_PREFIX)) {
                 String connectorName = 
record.key().substring(CONNECTOR_PREFIX.length());
                 boolean removed = false;
@@ -487,6 +507,8 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
                         log.debug("Updating configuration for connector " + 
connectorName + " configuration: " + newConnectorConfig);
                         connectorConfigs.put(connectorName, (Map<String, 
String>) newConnectorConfig);
 
+                        // Set the initial state of the connector to STARTED, 
which ensures that any connectors
+                        // which were created with 0.9 Connect will be 
initialized in the STARTED state.
                         if (!connectorTargetStates.containsKey(connectorName))
                             connectorTargetStates.put(connectorName, 
TargetState.STARTED);
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ee36c94/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 0d805da..0768781 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -203,7 +203,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         int priorCount = count.get();
         Thread.sleep(100);
-        assertEquals(priorCount, count.get());
+
+        // since the transition is observed asynchronously, the count could be 
off by one loop iteration
+        assertTrue(count.get() - priorCount <= 1);
 
         workerTask.stop();
         assertTrue(workerTask.awaitStop(1000));

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ee36c94/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index fbccc55..81e6be8 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -130,6 +130,9 @@ public class DistributedHerderTest {
     private static final ClusterConfigState SNAPSHOT = new 
ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
             Collections.singletonMap(CONN1, CONN1_CONFIG), 
Collections.singletonMap(CONN1, TargetState.STARTED),
             TASK_CONFIGS_MAP, Collections.<String>emptySet());
+    private static final ClusterConfigState SNAPSHOT_PAUSED_CONN1 = new 
ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG), 
Collections.singletonMap(CONN1, TargetState.PAUSED),
+            TASK_CONFIGS_MAP, Collections.<String>emptySet());
     private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = 
new ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
             Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), 
Collections.singletonMap(CONN1, TargetState.STARTED),
             TASK_CONFIGS_MAP, Collections.<String>emptySet());
@@ -747,6 +750,207 @@ public class DistributedHerderTest {
     }
 
     @Test
+    public void testConnectorPaused() throws Exception {
+        // ensure that target state changes are propagated to the worker
+
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+        // join
+        expectRebalance(1, Arrays.asList(CONN1), 
Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall();
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // handle the state change
+        member.wakeup();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        
EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true);
+
+        worker.setTargetState(CONN1, TargetState.PAUSED);
+        PowerMock.expectLastCall();
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick(); // join
+        configUpdateListener.onConnectorTargetStateChange(CONN1); // state 
changes to paused
+        herder.tick(); // worker should apply the state change
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConnectorResumed() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+        // start with the connector paused
+        expectRebalance(1, Arrays.asList(CONN1), 
Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1);
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));
+        PowerMock.expectLastCall();
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // handle the state change
+        member.wakeup();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+        PowerMock.expectLastCall();
+
+        // we expect reconfiguration after resuming
+        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true);
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
+
+        worker.setTargetState(CONN1, TargetState.STARTED);
+        PowerMock.expectLastCall();
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick(); // join
+        configUpdateListener.onConnectorTargetStateChange(CONN1); // state 
changes to started
+        herder.tick(); // apply state change
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testUnknownConnectorPaused() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+        // join
+        expectRebalance(1, Collections.<String>emptyList(), 
Collections.singletonList(TASK0));
+        expectPostRebalanceCatchup(SNAPSHOT);
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), 
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // state change is ignored since we have no target state
+        member.wakeup();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+        PowerMock.expectLastCall();
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick(); // join
+        configUpdateListener.onConnectorTargetStateChange("unknown-connector");
+        herder.tick(); // continue
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConnectorPausedRunningTaskOnly() throws Exception {
+        // even if we don't own the connector, we should still propagate 
target state
+        // changes to the worker so that tasks will transition correctly
+
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.<String>emptySet());
+
+        // join
+        expectRebalance(1, Collections.<String>emptyList(), 
Collections.singletonList(TASK0));
+        expectPostRebalanceCatchup(SNAPSHOT);
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), 
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // handle the state change
+        member.wakeup();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        
EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false);
+
+        worker.setTargetState(CONN1, TargetState.PAUSED);
+        PowerMock.expectLastCall();
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick(); // join
+        configUpdateListener.onConnectorTargetStateChange(CONN1); // state 
changes to paused
+        herder.tick(); // apply state change
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConnectorResumedRunningTaskOnly() throws Exception {
+        // even if we don't own the connector, we should still propagate 
target state
+        // changes to the worker so that tasks will transition correctly
+
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.<String>emptySet());
+
+        // join
+        expectRebalance(1, Collections.<String>emptyList(), 
Collections.singletonList(TASK0));
+        expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1);
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), 
EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // handle the state change
+        member.wakeup();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false);
+
+        worker.setTargetState(CONN1, TargetState.STARTED);
+        PowerMock.expectLastCall();
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick(); // join
+        configUpdateListener.onConnectorTargetStateChange(CONN1); // state 
changes to paused
+        herder.tick(); // apply state change
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testTaskConfigAdded() {
         // Task config always requires rebalance
         EasyMock.expect(member.memberId()).andStubReturn("member");

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ee36c94/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 617177e..f5bce8f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
@@ -53,9 +54,11 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(KafkaConfigBackingStore.class)
@@ -107,6 +110,7 @@ public class KafkaConfigBackingStoreTest {
             new 
Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", 
SAMPLE_CONFIGS.get(0)),
             new 
Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", 
SAMPLE_CONFIGS.get(1))
     );
+    private static final Struct TARGET_STATE_PAUSED = new 
Struct(KafkaConfigBackingStore.TARGET_STATE_V0).put("state", "PAUSED");
 
     private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
             = new 
Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
@@ -181,15 +185,10 @@ public class KafkaConfigBackingStoreTest {
         EasyMock.expectLastCall();
 
         // Config deletion
-        expectConvertWriteAndRead(
-                CONNECTOR_CONFIG_KEYS.get(1), 
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, null, null, null);
+        expectConnectorRemoval(CONNECTOR_CONFIG_KEYS.get(1), 
TARGET_STATE_KEYS.get(1));
         configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(1));
         EasyMock.expectLastCall();
 
-        // Target state deletion
-        storeLog.send(TARGET_STATE_KEYS.get(1), null);
-        PowerMock.expectLastCall();
-
         expectStop();
 
         PowerMock.replayAll();
@@ -220,9 +219,10 @@ public class KafkaConfigBackingStoreTest {
         // Deletion should remove the second one we added
         configStorage.removeConnectorConfig(CONNECTOR_IDS.get(1));
         configState = configStorage.snapshot();
-        assertEquals(3, configState.offset());
+        assertEquals(4, configState.offset());
         assertEquals(SAMPLE_CONFIGS.get(0), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
         assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+        assertNull(configState.targetState(CONNECTOR_IDS.get(1)));
 
         configStorage.stop();
 
@@ -346,6 +346,176 @@ public class KafkaConfigBackingStoreTest {
     }
 
     @Test
+    public void testRestoreTargetState() throws Exception {
+        expectConfigure();
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0)),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), 
CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), 
CONFIGS_SERIALIZED.get(3)),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(4)));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), TARGET_STATE_PAUSED);
+        deserialized.put(CONFIGS_SERIALIZED.get(4), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        logOffset = 5;
+
+        expectStart(existingRecords, deserialized);
+
+        // Shouldn't see any callbacks since this is during startup
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.start();
+
+        // Should see a single connector with initial state paused
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(5, configState.offset()); // Should always be next to be 
read, even if uncommitted
+        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new 
ArrayList<>(configState.connectors()));
+        assertEquals(TargetState.PAUSED, 
configState.targetState(CONNECTOR_IDS.get(0)));
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testBackgroundUpdateTargetState() throws Exception {
+        // verify that we handle target state changes correctly when they come 
up through the log
+
+        expectConfigure();
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0)),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), 
CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(3)));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        logOffset = 5;
+
+        expectStart(existingRecords, deserialized);
+
+        expectRead(TARGET_STATE_KEYS.get(0), CONFIGS_SERIALIZED.get(0), 
TARGET_STATE_PAUSED);
+
+        
configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
+        EasyMock.expectLastCall();
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.start();
+
+        // Should see a single connector with initial state paused
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(TargetState.STARTED, 
configState.targetState(CONNECTOR_IDS.get(0)));
+
+        configStorage.refresh(0, TimeUnit.SECONDS);
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testBackgroundConnectorDeletion() throws Exception {
+        // verify that we handle connector deletions correctly when they come 
up through the log
+
+        expectConfigure();
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0)),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), 
CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(3)));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        logOffset = 5;
+
+        expectStart(existingRecords, deserialized);
+
+        LinkedHashMap<String, byte[]> serializedData = new LinkedHashMap<>();
+        serializedData.put(CONNECTOR_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0));
+        serializedData.put(TARGET_STATE_KEYS.get(0), 
CONFIGS_SERIALIZED.get(1));
+
+        Map<String, Struct> deserializedData = new HashMap<>();
+        deserializedData.put(CONNECTOR_CONFIG_KEYS.get(0), null);
+        deserializedData.put(TARGET_STATE_KEYS.get(0), null);
+
+        expectRead(serializedData, deserializedData);
+
+        configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(0));
+        EasyMock.expectLastCall();
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.start();
+
+        // Should see a single connector with initial state paused
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(TargetState.STARTED, 
configState.targetState(CONNECTOR_IDS.get(0)));
+
+        configStorage.refresh(0, TimeUnit.SECONDS);
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestoreTargetStateUnexpectedDeletion() throws Exception {
+        expectConfigure();
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0)),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), 
CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), 
CONFIGS_SERIALIZED.get(3)),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(4)));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), null);
+        deserialized.put(CONFIGS_SERIALIZED.get(4), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        logOffset = 5;
+
+        expectStart(existingRecords, deserialized);
+
+        // Shouldn't see any callbacks since this is during startup
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.start();
+
+        // The target state deletion should reset the state to STARTED
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(5, configState.offset()); // Should always be next to be 
read, even if uncommitted
+        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new 
ArrayList<>(configState.connectors()));
+        assertEquals(TargetState.STARTED, 
configState.targetState(CONNECTOR_IDS.get(0)));
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testRestore() throws Exception {
         // Restoring data should notify only of the latest values after 
loading is complete. This also validates
         // that inconsistent state is ignored.
@@ -385,6 +555,7 @@ public class KafkaConfigBackingStoreTest {
         ClusterConfigState configState = configStorage.snapshot();
         assertEquals(7, configState.offset()); // Should always be next to be 
read, even if uncommitted
         assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new 
ArrayList<>(configState.connectors()));
+        assertEquals(TargetState.STARTED, 
configState.targetState(CONNECTOR_IDS.get(0)));
         // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
         assertEquals(SAMPLE_CONFIGS.get(2), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
         // Should see 2 tasks for that connector. Only config updates before 
the root key update should be reflected
@@ -400,6 +571,51 @@ public class KafkaConfigBackingStoreTest {
     }
 
     @Test
+    public void testRestoreConnectorDeletion() throws Exception {
+        // Restoring data should notify only of the latest values after 
loading is complete. This also validates
+        // that inconsistent state is ignored.
+
+        expectConfigure();
+        // Overwrite each type at least once to ensure we see the latest data 
after loading
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0)),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), 
CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(3)),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, TARGET_STATE_KEYS.get(0), 
CONFIGS_SERIALIZED.get(4)),
+                new ConsumerRecord<>(TOPIC, 0, 5, 0L, 
TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(5)));
+
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), null);
+        deserialized.put(CONFIGS_SERIALIZED.get(4), null);
+        deserialized.put(CONFIGS_SERIALIZED.get(5), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+
+        logOffset = 6;
+        expectStart(existingRecords, deserialized);
+
+        // Shouldn't see any callbacks since this is during startup
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.start();
+
+        // Should see a single connector and its config should be the last one 
seen anywhere in the log
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(6, configState.offset()); // Should always be next to be 
read, even if uncommitted
+        assertTrue(configState.connectors().isEmpty());
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testRestoreZeroTasks() throws Exception {
         // Restoring data should notify only of the latest values after 
loading is complete. This also validates
         // that inconsistent state is ignored.
@@ -558,6 +774,22 @@ public class KafkaConfigBackingStoreTest {
         PowerMock.expectLastCall();
     }
 
+    private void expectRead(LinkedHashMap<String, byte[]> serializedValues,
+                            Map<String, Struct> deserializedValues) {
+        expectReadToEnd(serializedValues);
+        for (Map.Entry<String, Struct> deserializedValueEntry : 
deserializedValues.entrySet()) {
+            byte[] serializedValue = 
serializedValues.get(deserializedValueEntry.getKey());
+            EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), 
EasyMock.aryEq(serializedValue)))
+                    .andReturn(new SchemaAndValue(null, 
structToMap(deserializedValueEntry.getValue())));
+        }
+    }
+
+    private void expectRead(final String key, final byte[] serializedValue, 
Struct deserializedValue) {
+        LinkedHashMap<String, byte[]> serializedData = new LinkedHashMap<>();
+        serializedData.put(key, serializedValue);
+        expectRead(serializedData, Collections.singletonMap(key, 
deserializedValue));
+    }
+
     // Expect a conversion & write to the underlying log, followed by a 
subsequent read when the data is consumed back
     // from the log. Validate the data that is captured when the conversion is 
performed matches the specified data
     // (by checking a single field's value)
@@ -596,6 +828,15 @@ public class KafkaConfigBackingStoreTest {
                 });
     }
 
+    private void expectConnectorRemoval(String configKey, String 
targetStateKey) {
+        expectConvertWriteRead(configKey, 
KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, null, null, null);
+        expectConvertWriteRead(targetStateKey, 
KafkaConfigBackingStore.TARGET_STATE_V0, null, null, null);
+
+        LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
+        recordsToRead.put(configKey, null);
+        recordsToRead.put(targetStateKey, null);
+        expectReadToEnd(recordsToRead);
+    }
 
     private void expectConvertWriteAndRead(final String configKey, final 
Schema valueSchema, final byte[] serialized,
                                            final String dataFieldName, final 
Object dataFieldValue) {
@@ -619,6 +860,9 @@ public class KafkaConfigBackingStoreTest {
 
     // Generates a Map representation of Struct. Only does shallow traversal, 
so nested structs are not converted
     private Map<String, Object> structToMap(Struct struct) {
+        if (struct == null)
+            return null;
+
         HashMap<String, Object> result = new HashMap<>();
         for (Field field : struct.schema().fields())
             result.put(field.name(), struct.get(field));

Reply via email to