[ https://issues.apache.org/jira/browse/KAFKA-7002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16505424#comment-16505424 ]
ASF GitHub Bot commented on KAFKA-7002: --------------------------------------- ewencp closed pull request #5145: KAFKA-7002: Add a config property for DLQ topic's replication factor (KIP-298) URL: https://github.com/apache/kafka/pull/5145 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java index 9629f8f0e42..6e9bd6b9e71 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java @@ -52,10 +52,16 @@ public static final String DLQ_TOPIC_DEFAULT = ""; private static final String DLQ_TOPIC_DISPLAY = "Dead Letter Queue Topic Name"; + public static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG = DLQ_PREFIX + "topic.replication.factor"; + private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC = "Replication factor used to create the dead letter queue topic when it doesn't already exist."; + public static final short DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT = 3; + private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY = "Dead Letter Queue Topic Replication Factor"; + static ConfigDef config = ConnectorConfig.configDef() .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY) .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY) - .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY); + .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY) + .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY); public static ConfigDef configDef() { return config; @@ -97,4 +103,8 @@ public static boolean hasTopicsRegexConfig(Map<String, String> props) { public String dlqTopicName() { return getString(DLQ_TOPIC_NAME_CONFIG); } + + public short dlqTopicReplicationFactor() { + return getShort(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG); + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java index 9a8a9afb29b..459eeae1ff4 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java @@ -44,7 +44,6 @@ private static final Logger log = LoggerFactory.getLogger(DeadLetterQueueReporter.class); - private static final short DLQ_MAX_DESIRED_REPLICATION_FACTOR = 3; private static final int DLQ_NUM_DESIRED_PARTITIONS = 1; private final SinkConnectorConfig connConfig; @@ -53,13 +52,13 @@ private ErrorHandlingMetrics errorHandlingMetrics; public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig, - SinkConnectorConfig connConfig, Map<String, Object> producerProps) { - String topic = connConfig.dlqTopicName(); + SinkConnectorConfig sinkConfig, Map<String, Object> producerProps) { + String topic = sinkConfig.dlqTopicName(); try (AdminClient admin = AdminClient.create(workerConfig.originals())) { if (!admin.listTopics().names().get().contains(topic)) { log.error("Topic {} doesn't exist. Will attempt to create topic.", topic); - NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS, DLQ_MAX_DESIRED_REPLICATION_FACTOR); + NewTopic schemaTopicRequest = new NewTopic(topic, DLQ_NUM_DESIRED_PARTITIONS, sinkConfig.dlqTopicReplicationFactor()); admin.createTopics(singleton(schemaTopicRequest)).all().get(); } } catch (InterruptedException e) { @@ -71,7 +70,7 @@ public static DeadLetterQueueReporter createAndSetup(WorkerConfig workerConfig, } KafkaProducer<byte[], byte[]> dlqProducer = new KafkaProducer<>(producerProps); - return new DeadLetterQueueReporter(dlqProducer, connConfig); + return new DeadLetterQueueReporter(dlqProducer, sinkConfig); } /** diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java index b5410d071d8..f35c514816f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java @@ -180,6 +180,15 @@ public void testLogMessageWithSinkRecords() { "partition=5, offset=100}.", msg); } + @Test + public void testSetDLQConfigs() { + SinkConnectorConfig configuration = config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)); + assertEquals(configuration.dlqTopicName(), DLQ_TOPIC); + + configuration = config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, "7")); + assertEquals(configuration.dlqTopicReplicationFactor(), 7); + } + private ProcessingContext processingContext() { ProcessingContext context = new ProcessingContext(); context.consumerRecord(new ConsumerRecord<>(TOPIC, 5, 100, new byte[]{'a', 'b'}, new byte[]{'x'})); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow replication factor to be set via a configuration property for the > Connect DLQ topic > ----------------------------------------------------------------------------------------- > > Key: KAFKA-7002 > URL: https://issues.apache.org/jira/browse/KAFKA-7002 > Project: Kafka > Issue Type: Task > Components: KafkaConnect > Affects Versions: 2.0.0 > Reporter: Arjun Satish > Assignee: Arjun Satish > Priority: Major > Fix For: 2.0.0, 2.1.0 > > > Currently, the replication factor is hardcoded to a value of 3. This means > that we cannot use a DLQ in any cluster setup with less than three brokers. > It is better to have the user specify this value if the default value does > meet the requirements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)