This is an automated email from the ASF dual-hosted git repository.
AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cb7e3ab3757 KAFKA-20410: Register topic DLQ configs in LogConfig.
[2/N] (#22167)
cb7e3ab3757 is described below
commit cb7e3ab375747bfafb97c746a2049443c2ddd9a1
Author: Sushant Mahajan <[email protected]>
AuthorDate: Wed Apr 29 20:54:40 2026 +0530
KAFKA-20410: Register topic DLQ configs in LogConfig. [2/N] (#22167)
* Register TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG in
LogConfig.
* Update tests.
Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
<[email protected]>
---
core/src/test/scala/unit/kafka/log/LogConfigTest.scala | 1 +
.../java/org/apache/kafka/storage/internals/log/LogConfig.java | 8 ++++++++
2 files changed, 9 insertions(+)
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index ecef38cfbe7..16b276ff668 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -73,6 +73,7 @@ class LogConfigTest {
case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG =>
assertPropertyInvalid(name, "not_a_number", "-0.1")
case TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG =>
assertPropertyInvalid(name, "not_a_number", "remove", "0")
case TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG =>
assertPropertyInvalid(name, "not_a_number", "remove", "0")
+ case TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG =>
assertPropertyInvalid(name, "not_a_boolean")
case LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG => // no op
case _ => assertPropertyInvalid(name, "not_a_number", "-1")
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index 1b1811ca5e7..7c5153825cc 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -248,6 +248,7 @@ public class LogConfig extends AbstractConfig {
TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC)
.define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN,
false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC)
.define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG,
BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC)
+
.define(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG, BOOLEAN, false,
MEDIUM, TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_DOC)
.defineInternal(INTERNAL_SEGMENT_BYTES_CONFIG, INT, null,
null, MEDIUM, INTERNAL_SEGMENT_BYTES_DOC);
}
@@ -279,6 +280,7 @@ public class LogConfig extends AbstractConfig {
public final BrokerCompressionType compressionType;
public final Optional<Compression> compression;
public final boolean preallocate;
+ public final boolean errorsDeadletterqueueGroupEnable;
public final TimestampType messageTimestampType;
@@ -335,6 +337,7 @@ public class LogConfig extends AbstractConfig {
this.messageTimestampAfterMaxMs =
getLong(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG);
this.leaderReplicationThrottledReplicas =
Collections.unmodifiableList(getList(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
this.followerReplicationThrottledReplicas =
Collections.unmodifiableList(getList(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
+ this.errorsDeadletterqueueGroupEnable =
getBoolean(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG);
remoteLogConfig = new RemoteLogConfig(this);
}
@@ -387,6 +390,10 @@ public class LogConfig extends AbstractConfig {
return 0;
}
+ public boolean errorsDeadletterqueueGroupEnable() {
+ return errorsDeadletterqueueGroupEnable;
+ }
+
public boolean remoteStorageEnable() {
return remoteLogConfig.remoteStorageEnable;
}
@@ -652,6 +659,7 @@ public class LogConfig extends AbstractConfig {
", followerReplicationThrottledReplicas=" +
followerReplicationThrottledReplicas +
", remoteLogConfig=" + remoteLogConfig +
", maxMessageSize=" + maxMessageSize +
+ ", errorsDeadletterqueueGroupEnable=" +
errorsDeadletterqueueGroupEnable +
'}';
}