This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 188b86fa83e KAFKA-19793 Adding allow topic creation false for global
and restore consumer (#20723)
188b86fa83e is described below
commit 188b86fa83ecd6e8c0a7462c5b17673b1e066eb1
Author: Arpit Goyal <[email protected]>
AuthorDate: Tue Nov 11 05:23:52 2025 +0530
KAFKA-19793 Adding allow topic creation false for global and restore
consumer (#20723)
Added auto topic creation false for both restore and global consumers.
Reviewers: Nikita Shupletsov <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../org/apache/kafka/streams/StreamsConfig.java | 30 ++++-----
.../apache/kafka/streams/StreamsConfigTest.java | 73 ++++++++++++++++++++++
2 files changed, 89 insertions(+), 14 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 24701a7de96..0c37d8b2ce4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -826,7 +826,7 @@ public class StreamsConfig extends AbstractConfig {
private static final String
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows
maintainMs to ensure data is not deleted from the log prematurely. Allows for
clock drift. Default is 1 day";
private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS =
- new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
ConsumerConfig.GROUP_PROTOCOL_CONFIG};
+ new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
ConsumerConfig.GROUP_PROTOCOL_CONFIG,
ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG};
private static final String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS =
new String[] {ConsumerConfig.ISOLATION_LEVEL_CONFIG};
private static final String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS =
@@ -1265,7 +1265,8 @@ public class StreamsConfig extends AbstractConfig {
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
- ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic"
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic",
+ ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"
);
private static final Map<String, Object> CONSUMER_EOS_OVERRIDES;
@@ -1640,8 +1641,8 @@ public class StreamsConfig extends AbstractConfig {
clientProvidedProps.remove(GROUP_PROTOCOL_CONFIG);
- checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps,
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
- checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps,
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
+ checkIfUnexpectedUserSpecifiedClientConfig(clientProvidedProps,
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
+ checkIfUnexpectedUserSpecifiedClientConfig(clientProvidedProps,
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ?
CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
if (StreamsConfigUtils.eosEnabled(this)) {
@@ -1656,12 +1657,12 @@ public class StreamsConfig extends AbstractConfig {
return consumerProps;
}
- private void checkIfUnexpectedUserSpecifiedConsumerConfig(final
Map<String, Object> clientProvidedProps,
- final String[]
nonConfigurableConfigs) {
- // Streams does not allow users to configure certain consumer/producer
configurations, for example,
- // enable.auto.commit. In cases where user tries to override such
non-configurable
- // consumer/producer configurations, log a warning and remove the user
defined value from the Map.
- // Thus, the default values for these consumer/producer configurations
that are suitable for
+ private void checkIfUnexpectedUserSpecifiedClientConfig(final Map<String,
Object> clientProvidedProps,
+ final String[]
nonConfigurableConfigs) {
+ // Streams does not allow users to configure certain client
configurations (consumer/producer),
+ // for example, enable.auto.commit or transactional.id. In cases where
user tries to override
+ // such non-configurable client configurations, log a warning and
remove the user defined value
+ // from the Map. Thus, the default values for these client
configurations that are suitable for
// Streams will be used instead.
final String nonConfigurableConfigMessage = "Unexpected user-specified
{} config '{}' found. {} setting ({}) will be ignored and the Streams default
setting ({}) will be used.";
@@ -1769,6 +1770,7 @@ public class StreamsConfig extends AbstractConfig {
// Get main consumer override configs
final Map<String, Object> mainConsumerProps =
originalsWithPrefix(MAIN_CONSUMER_PREFIX);
+ checkIfUnexpectedUserSpecifiedClientConfig(mainConsumerProps,
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
consumerProps.putAll(mainConsumerProps);
// this is a hack to work around StreamsConfig constructor inside
StreamsPartitionAssignor to avoid casting
@@ -1799,9 +1801,6 @@ public class StreamsConfig extends AbstractConfig {
consumerProps.put(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
getInt(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));
consumerProps.put(TASK_ASSIGNOR_CLASS_CONFIG,
getString(TASK_ASSIGNOR_CLASS_CONFIG));
- // disable auto topic creation
- consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,
"false");
-
// verify that producer batch config is no larger than segment size,
then add topic configs required for creating topics
final Map<String, Object> topicProps =
originalsWithPrefix(TOPIC_PREFIX, false);
final Map<String, Object> producerProps =
getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
@@ -1844,6 +1843,7 @@ public class StreamsConfig extends AbstractConfig {
// Get restore consumer override configs
final Map<String, Object> restoreConsumerProps =
originalsWithPrefix(RESTORE_CONSUMER_PREFIX);
+ checkIfUnexpectedUserSpecifiedClientConfig(restoreConsumerProps,
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
baseConsumerProps.putAll(restoreConsumerProps);
// no need to set group id for a restore consumer
@@ -1877,6 +1877,7 @@ public class StreamsConfig extends AbstractConfig {
// Get global consumer override configs
final Map<String, Object> globalConsumerProps =
originalsWithPrefix(GLOBAL_CONSUMER_PREFIX);
+ checkIfUnexpectedUserSpecifiedClientConfig(globalConsumerProps,
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
baseConsumerProps.putAll(globalConsumerProps);
// no need to set group id for a global consumer
@@ -1887,6 +1888,7 @@ public class StreamsConfig extends AbstractConfig {
// add client id with stream client id prefix
baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId +
"-global-consumer");
baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+
return baseConsumerProps;
}
@@ -1903,7 +1905,7 @@ public class StreamsConfig extends AbstractConfig {
public Map<String, Object> getProducerConfigs(final String clientId) {
final Map<String, Object> clientProvidedProps =
getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
- checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps,
NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS);
+ checkIfUnexpectedUserSpecifiedClientConfig(clientProvidedProps,
NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS);
// generate producer configs from original properties and overridden
maps
final Map<String, Object> props = new HashMap<>(eosEnabled ?
PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 65147a81101..ca4048ee9c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -442,6 +442,79 @@ public class StreamsConfigTest {
assertTrue((boolean)
returnedProps.get(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG));
}
+ @Test
+ public void
shouldNotAllowAutoCreateTopicsForConsumers_WithCommonConsumerPrefix() {
+ // Test with generic consumer.* prefix (affects all consumer types)
+
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
"true");
+
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+ appender.setClassLogger(StreamsConfig.class, Level.ERROR);
+
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+
+ // Main consumer - verify override is ignored
+ final Map<String, Object> mainConfigs =
streamsConfig.getMainConsumerConfigs("group", "client", 0);
+ assertEquals("false",
mainConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
+ "Main consumer should not allow auto topic creation with
consumer.* override");
+
+ // Restore consumer - verify override is ignored
+ final Map<String, Object> restoreConfigs =
streamsConfig.getRestoreConsumerConfigs("client");
+ assertEquals("false",
restoreConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
+ "Restore consumer should not allow auto topic creation
with consumer.* override");
+
+ // Global consumer - verify override is ignored
+ final Map<String, Object> globalConfigs =
streamsConfig.getGlobalConsumerConfigs("client");
+ assertEquals("false",
globalConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
+ "Global consumer should not allow auto topic creation with
consumer.* override");
+
+ // Verify exactly 1 error is logged (consumer.* prefix is
validated once in getCommonConsumerConfigs for each type of consumer)
+ final List<String> errorMessages = appender.getMessages();
+ final long errorCount = errorMessages.stream()
+ .filter(msg -> msg.contains("Unexpected user-specified
consumer config 'allow.auto.create.topics' found"))
+ .count();
+ assertEquals(3, errorCount,
+ "Should log exactly 3 error for consumer.* prefix");
+ }
+ }
+
+ @Test
+ public void
shouldNotAllowAutoCreateTopicsForConsumers_WithSpecificConsumerPrefixes() {
+ // Test with specific prefixes for each consumer type
+
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
"true");
+
props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
"true");
+
props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
"true");
+
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+ appender.setClassLogger(StreamsConfig.class, Level.ERROR);
+
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+
+ // Main consumer - verify override is ignored
+ final Map<String, Object> mainConfigs =
streamsConfig.getMainConsumerConfigs("group", "client", 0);
+ assertEquals("false",
mainConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
+ "Main consumer should not allow auto topic creation with
main.consumer.* override");
+
+ // Restore consumer - verify override is ignored
+ final Map<String, Object> restoreConfigs =
streamsConfig.getRestoreConsumerConfigs("client");
+ assertEquals("false",
restoreConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
+ "Restore consumer should not allow auto topic creation
with restore.consumer.* override");
+
+ // Global consumer - verify override is ignored
+ final Map<String, Object> globalConfigs =
streamsConfig.getGlobalConsumerConfigs("client");
+ assertEquals("false",
globalConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
+ "Global consumer should not allow auto topic creation with
global.consumer.* override");
+
+ // Verify exactly 3 errors are logged (one for each specific
prefix)
+ final List<String> errorMessages = appender.getMessages();
+ final long errorCount = errorMessages.stream()
+ .filter(msg -> msg.contains("Unexpected user-specified
consumer config 'allow.auto.create.topics' found"))
+ .count();
+ assertEquals(3, errorCount,
+ "Should log exactly 3 errors: one for main.consumer.*, one
for restore.consumer.*, one for global.consumer.*");
+ }
+ }
+
+
@Test
public void shouldSupportNonPrefixedAdminConfigs() {
props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 10);