This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 484ff65  KAFKA-9216: Enforce internal config topic settings for 
Connect workers during startup (#8270)
484ff65 is described below

commit 484ff65cd31587ff3f1bf36e178985c91bed9d08
Author: Evelyn Bayes <[email protected]>
AuthorDate: Mon Jun 8 05:42:00 2020 +1000

    KAFKA-9216: Enforce internal config topic settings for Connect workers 
during startup (#8270)
    
    Currently, Kafka Connect creates its config backing topic with a fire and 
forget approach.
    This is fine unless someone has manually created that topic already with 
the wrong partition count.
    
    In such a case Kafka Connect may run for some time. Especially if it's in 
standalone mode and once switched to distributed mode it will almost certainly 
fail.
    
    This commits adds a check when the KafkaConfigBackingStore is starting.
    This check will throw a ConfigException if there is more than one partition 
in the backing store.
    
    This exception is then caught upstream and logged by either:
    - DistributedHerder#run
    - ConnectStandalone#main
    
    A unit tests was added in KafkaConfigBackingStoreTest to verify the 
behaviour.
    
    Author: Evelyn Bayes <[email protected]>
    Co-authored-by: Randall Hauch <[email protected]>
    
    Reviewer: Konstantine Karantasis <[email protected]>
---
 .../connect/storage/KafkaConfigBackingStore.java   | 10 ++++++
 .../apache/kafka/connect/util/KafkaBasedLog.java   |  5 +++
 .../storage/KafkaConfigBackingStoreTest.java       | 36 ++++++++++++++++++++++
 3 files changed, 51 insertions(+)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 60c2665..9512e03 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -265,6 +265,16 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
         // Before startup, callbacks are *not* invoked. You can grab a 
snapshot after starting -- just take care that
         // updates can continue to occur in the background
         configLog.start();
+
+        int partitionCount = configLog.partitionCount();
+        if (partitionCount > 1) {
+            String msg = String.format("Topic '%s' supplied via the '%s' 
property is required "
+                    + "to have a single partition in order to guarantee 
consistency of "
+                    + "connector configurations, but found %d partitions.",
+                    topic, DistributedConfig.CONFIG_TOPIC_CONFIG, 
partitionCount);
+            throw new ConfigException(msg);
+        }
+
         started = true;
         log.info("Started KafkaConfigBackingStore");
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index e301581..69d2588 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -74,6 +74,7 @@ public class KafkaBasedLog<K, V> {
 
     private Time time;
     private final String topic;
+    private int partitionCount;
     private final Map<String, Object> producerConfigs;
     private final Map<String, Object> consumerConfigs;
     private final Callback<ConsumerRecord<K, V>> consumedCallback;
@@ -145,6 +146,7 @@ public class KafkaBasedLog<K, V> {
 
         for (PartitionInfo partition : partitionInfos)
             partitions.add(new TopicPartition(partition.topic(), 
partition.partition()));
+        partitionCount = partitions.size();
         consumer.assign(partitions);
 
         // Always consume from the beginning of all partitions. Necessary to 
ensure that we don't use committed offsets
@@ -238,6 +240,9 @@ public class KafkaBasedLog<K, V> {
         producer.send(new ProducerRecord<>(topic, key, value), callback);
     }
 
+    public int partitionCount() {
+        return partitionCount;
+    }
 
     private Producer<K, V> createProducer() {
         // Always require producer acks to all to ensure durable writes
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 8d78219..a46f6fe 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
@@ -60,6 +61,7 @@ import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
@@ -161,6 +163,7 @@ public class KafkaConfigBackingStoreTest {
     public void testStartStop() throws Exception {
         expectConfigure();
         expectStart(Collections.emptyList(), Collections.emptyMap());
+        expectPartitionCount(1);
         expectStop();
         PowerMock.replayAll();
 
@@ -208,6 +211,7 @@ public class KafkaConfigBackingStoreTest {
         configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(1));
         EasyMock.expectLastCall();
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -276,6 +280,7 @@ public class KafkaConfigBackingStoreTest {
         serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(2));
         expectReadToEnd(serializedConfigs);
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -360,6 +365,7 @@ public class KafkaConfigBackingStoreTest {
         serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(1), 
CONFIGS_SERIALIZED.get(4));
         expectReadToEnd(serializedConfigs);
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -421,6 +427,7 @@ public class KafkaConfigBackingStoreTest {
         serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0));
         expectReadToEnd(serializedConfigs);
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -474,6 +481,7 @@ public class KafkaConfigBackingStoreTest {
 
         // Shouldn't see any callbacks since this is during startup
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -516,6 +524,7 @@ public class KafkaConfigBackingStoreTest {
         
configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
         EasyMock.expectLastCall();
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -566,6 +575,7 @@ public class KafkaConfigBackingStoreTest {
         configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(0));
         EasyMock.expectLastCall();
 
+        expectPartitionCount(1);
         expectStop();
 
         PowerMock.replayAll();
@@ -611,6 +621,7 @@ public class KafkaConfigBackingStoreTest {
         logOffset = 5;
 
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Shouldn't see any callbacks since this is during startup
 
@@ -658,6 +669,7 @@ public class KafkaConfigBackingStoreTest {
         deserialized.put(CONFIGS_SERIALIZED.get(6), 
TASK_CONFIG_STRUCTS.get(1));
         logOffset = 7;
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Shouldn't see any callbacks since this is during startup
 
@@ -712,6 +724,7 @@ public class KafkaConfigBackingStoreTest {
 
         logOffset = 6;
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Shouldn't see any callbacks since this is during startup
 
@@ -759,6 +772,7 @@ public class KafkaConfigBackingStoreTest {
         deserialized.put(CONFIGS_SERIALIZED.get(7), 
TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR);
         logOffset = 8;
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Shouldn't see any callbacks since this is during startup
 
@@ -806,6 +820,7 @@ public class KafkaConfigBackingStoreTest {
         deserialized.put(CONFIGS_SERIALIZED.get(5), 
TASK_CONFIG_STRUCTS.get(1));
         logOffset = 6;
         expectStart(existingRecords, deserialized);
+        expectPartitionCount(1);
 
         // Successful attempt to write new task config
         expectReadToEnd(new LinkedHashMap<>());
@@ -860,6 +875,22 @@ public class KafkaConfigBackingStoreTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testExceptionOnStartWhenConfigTopicHasMultiplePartitions() 
throws Exception {
+        expectConfigure();
+        expectStart(Collections.emptyList(), Collections.emptyMap());
+
+        expectPartitionCount(2);
+
+        PowerMock.replayAll();
+
+        configStorage.setupAndCreateKafkaBasedLog(TOPIC, 
DEFAULT_DISTRIBUTED_CONFIG);
+        ConfigException e = assertThrows(ConfigException.class, () -> 
configStorage.start());
+        assertTrue(e.getMessage().contains("required to have a single 
partition"));
+
+        PowerMock.verifyAll();
+    }
+
     private void expectConfigure() throws Exception {
         PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
                 EasyMock.capture(capturedTopic), 
EasyMock.capture(capturedProducerProps),
@@ -868,6 +899,11 @@ public class KafkaConfigBackingStoreTest {
                 .andReturn(storeLog);
     }
 
+    private void expectPartitionCount(int partitionCount) {
+        EasyMock.expect(storeLog.partitionCount())
+                .andReturn(partitionCount);
+    }
+
     // If non-empty, deserializations should be a LinkedHashMap
     private void expectStart(final List<ConsumerRecord<String, byte[]>> 
preexistingRecords,
                              final Map<byte[], Struct> deserializations) {

Reply via email to