Randall Hauch created KAFKA-8370:
------------------------------------
Summary: Kafka Connect should check for existence of internal
topics before attempting to create them
Key: KAFKA-8370
URL: https://issues.apache.org/jira/browse/KAFKA-8370
Project: Kafka
Issue Type: Bug
Components: KafkaConnect
Affects Versions: 0.11.0.0
Reporter: Randall Hauch
Assignee: Randall Hauch
The Connect worker doesn't current check for the existence of the internal
topics, and instead is issuing a CreateTopic request and handling a
TopicExistsException. However, this can cause problems when the number of
brokers is fewer than the replication factor, *even if the topic already
exists* and the partitions of those topics all remain available on the
remaining brokers.
One problem of the current approach is that the broker checks the requested
replication factor before checking for the existence of the topic, resulting in
unexpected exceptions when the topic does exist:
{noformat}
connect | [2019-05-14 19:24:25,166] ERROR Uncaught exception in herder
work thread, exiting:
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)
connect | org.apache.kafka.connect.errors.ConnectException: Error while
attempting to create/find topic(s) 'connect-offsets'
connect | at
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
connect | at
org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:99)
connect | at
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:127)
connect | at
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:109)
connect | at
org.apache.kafka.connect.runtime.Worker.start(Worker.java:164)
connect | at
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:114)
connect | at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:214)
connect | at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect | at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect | at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect | at java.lang.Thread.run(Thread.java:748)
connect | Caused by: java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication
factor: 3 larger than available brokers: 2.
connect | at
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
connect | at
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
connect | at
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
connect | at
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
connect | at
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:228)
connect | ... 11 more
connect | Caused by:
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication
factor: 3 larger than available brokers: 2.
connect | [2019-05-14 19:24:25,168] INFO Kafka Connect stopping
(org.apache.kafka.connect.runtime.Connect)
{noformat}
Instead of always issuing a CreateTopic request, the worker's admin client
should first check whether the topic exists, and if not *then* attempt to
create the topic.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)