This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 188b86fa83e KAFKA-19793 Adding allow topic creation false for global 
and restore consumer (#20723)
188b86fa83e is described below

commit 188b86fa83ecd6e8c0a7462c5b17673b1e066eb1
Author: Arpit Goyal <[email protected]>
AuthorDate: Tue Nov 11 05:23:52 2025 +0530

    KAFKA-19793 Adding allow topic creation false for global and restore 
consumer (#20723)
    
    Added auto topic creation false for both restore and global consumers.
    
    Reviewers: Nikita Shupletsov <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../org/apache/kafka/streams/StreamsConfig.java    | 30 ++++-----
 .../apache/kafka/streams/StreamsConfigTest.java    | 73 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 14 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 24701a7de96..0c37d8b2ce4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -826,7 +826,7 @@ public class StreamsConfig extends AbstractConfig {
     private static final String 
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows 
maintainMs to ensure data is not deleted from the log prematurely. Allows for 
clock drift. Default is 1 day";
 
     private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS =
-        new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
ConsumerConfig.GROUP_PROTOCOL_CONFIG};
+        new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG};
     private static final String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS =
         new String[] {ConsumerConfig.ISOLATION_LEVEL_CONFIG};
     private static final String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS =
@@ -1265,7 +1265,8 @@ public class StreamsConfig extends AbstractConfig {
         ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000",
         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
-        ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic"
+        ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic",
+        ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"
     );
 
     private static final Map<String, Object> CONSUMER_EOS_OVERRIDES;
@@ -1640,8 +1641,8 @@ public class StreamsConfig extends AbstractConfig {
 
         clientProvidedProps.remove(GROUP_PROTOCOL_CONFIG);
 
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
+        checkIfUnexpectedUserSpecifiedClientConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
+        checkIfUnexpectedUserSpecifiedClientConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
 
         final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? 
CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
         if (StreamsConfigUtils.eosEnabled(this)) {
@@ -1656,12 +1657,12 @@ public class StreamsConfig extends AbstractConfig {
         return consumerProps;
     }
 
-    private void checkIfUnexpectedUserSpecifiedConsumerConfig(final 
Map<String, Object> clientProvidedProps,
-                                                              final String[] 
nonConfigurableConfigs) {
-        // Streams does not allow users to configure certain consumer/producer 
configurations, for example,
-        // enable.auto.commit. In cases where user tries to override such 
non-configurable
-        // consumer/producer configurations, log a warning and remove the user 
defined value from the Map.
-        // Thus, the default values for these consumer/producer configurations 
that are suitable for
+    private void checkIfUnexpectedUserSpecifiedClientConfig(final Map<String, 
Object> clientProvidedProps,
+                                                            final String[] 
nonConfigurableConfigs) {
+        // Streams does not allow users to configure certain client 
configurations (consumer/producer),
+        // for example, enable.auto.commit or transactional.id. In cases where 
user tries to override
+        // such non-configurable client configurations, log a warning and 
remove the user defined value
+        // from the Map. Thus, the default values for these client 
configurations that are suitable for
         // Streams will be used instead.
 
         final String nonConfigurableConfigMessage = "Unexpected user-specified 
{} config '{}' found. {} setting ({}) will be ignored and the Streams default 
setting ({}) will be used.";
@@ -1769,6 +1770,7 @@ public class StreamsConfig extends AbstractConfig {
 
         // Get main consumer override configs
         final Map<String, Object> mainConsumerProps = 
originalsWithPrefix(MAIN_CONSUMER_PREFIX);
+        checkIfUnexpectedUserSpecifiedClientConfig(mainConsumerProps, 
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
         consumerProps.putAll(mainConsumerProps);
 
         // this is a hack to work around StreamsConfig constructor inside 
StreamsPartitionAssignor to avoid casting
@@ -1799,9 +1801,6 @@ public class StreamsConfig extends AbstractConfig {
         consumerProps.put(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, 
getInt(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));
         consumerProps.put(TASK_ASSIGNOR_CLASS_CONFIG, 
getString(TASK_ASSIGNOR_CLASS_CONFIG));
 
-        // disable auto topic creation
-        consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, 
"false");
-
         // verify that producer batch config is no larger than segment size, 
then add topic configs required for creating topics
         final Map<String, Object> topicProps = 
originalsWithPrefix(TOPIC_PREFIX, false);
         final Map<String, Object> producerProps = 
getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
@@ -1844,6 +1843,7 @@ public class StreamsConfig extends AbstractConfig {
 
         // Get restore consumer override configs
         final Map<String, Object> restoreConsumerProps = 
originalsWithPrefix(RESTORE_CONSUMER_PREFIX);
+        checkIfUnexpectedUserSpecifiedClientConfig(restoreConsumerProps, 
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
         baseConsumerProps.putAll(restoreConsumerProps);
 
         // no need to set group id for a restore consumer
@@ -1877,6 +1877,7 @@ public class StreamsConfig extends AbstractConfig {
 
         // Get global consumer override configs
         final Map<String, Object> globalConsumerProps = 
originalsWithPrefix(GLOBAL_CONSUMER_PREFIX);
+        checkIfUnexpectedUserSpecifiedClientConfig(globalConsumerProps, 
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
         baseConsumerProps.putAll(globalConsumerProps);
 
         // no need to set group id for a global consumer
@@ -1887,6 +1888,7 @@ public class StreamsConfig extends AbstractConfig {
         // add client id with stream client id prefix
         baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + 
"-global-consumer");
         baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+
         return baseConsumerProps;
     }
 
@@ -1903,7 +1905,7 @@ public class StreamsConfig extends AbstractConfig {
     public Map<String, Object> getProducerConfigs(final String clientId) {
         final Map<String, Object> clientProvidedProps = 
getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
 
-        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS);
+        checkIfUnexpectedUserSpecifiedClientConfig(clientProvidedProps, 
NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS);
 
         // generate producer configs from original properties and overridden 
maps
         final Map<String, Object> props = new HashMap<>(eosEnabled ? 
PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 65147a81101..ca4048ee9c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -442,6 +442,79 @@ public class StreamsConfigTest {
         assertTrue((boolean) 
returnedProps.get(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG));
     }
 
+    @Test
+    public void 
shouldNotAllowAutoCreateTopicsForConsumers_WithCommonConsumerPrefix() {
+        // Test with generic consumer.* prefix (affects all consumer types)
+        
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
 "true");
+
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+            appender.setClassLogger(StreamsConfig.class, Level.ERROR);
+
+            final StreamsConfig streamsConfig = new StreamsConfig(props);
+
+            // Main consumer - verify override is ignored
+            final Map<String, Object> mainConfigs = 
streamsConfig.getMainConsumerConfigs("group", "client", 0);
+            assertEquals("false", 
mainConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
+                    "Main consumer should not allow auto topic creation with 
consumer.* override");
+
+            // Restore consumer - verify override is ignored
+            final Map<String, Object> restoreConfigs = 
streamsConfig.getRestoreConsumerConfigs("client");
+            assertEquals("false", 
restoreConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
+                    "Restore consumer should not allow auto topic creation 
with consumer.* override");
+
+            // Global consumer - verify override is ignored
+            final Map<String, Object> globalConfigs = 
streamsConfig.getGlobalConsumerConfigs("client");
+            assertEquals("false", 
globalConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
+                    "Global consumer should not allow auto topic creation with 
consumer.* override");
+
+            // Verify exactly 1 error is logged (consumer.* prefix is 
validated once in getCommonConsumerConfigs for each type of consumer)
+            final List<String> errorMessages = appender.getMessages();
+            final long errorCount = errorMessages.stream()
+                    .filter(msg -> msg.contains("Unexpected user-specified 
consumer config 'allow.auto.create.topics' found"))
+                    .count();
+            assertEquals(3, errorCount,
+                    "Should log exactly 3 error for consumer.* prefix");
+        }
+    }
+
+    @Test
+    public void 
shouldNotAllowAutoCreateTopicsForConsumers_WithSpecificConsumerPrefixes() {
+        // Test with specific prefixes for each consumer type
+        
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
 "true");
+        
props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
 "true");
+        
props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
 "true");
+
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+            appender.setClassLogger(StreamsConfig.class, Level.ERROR);
+
+            final StreamsConfig streamsConfig = new StreamsConfig(props);
+
+            // Main consumer - verify override is ignored
+            final Map<String, Object> mainConfigs = 
streamsConfig.getMainConsumerConfigs("group", "client", 0);
+            assertEquals("false", 
mainConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
+                    "Main consumer should not allow auto topic creation with 
main.consumer.* override");
+
+            // Restore consumer - verify override is ignored
+            final Map<String, Object> restoreConfigs = 
streamsConfig.getRestoreConsumerConfigs("client");
+            assertEquals("false", 
restoreConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
+                    "Restore consumer should not allow auto topic creation 
with restore.consumer.* override");
+
+            // Global consumer - verify override is ignored
+            final Map<String, Object> globalConfigs = 
streamsConfig.getGlobalConsumerConfigs("client");
+            assertEquals("false", 
globalConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
+                    "Global consumer should not allow auto topic creation with 
global.consumer.* override");
+
+            // Verify exactly 3 errors are logged (one for each specific 
prefix)
+            final List<String> errorMessages = appender.getMessages();
+            final long errorCount = errorMessages.stream()
+                    .filter(msg -> msg.contains("Unexpected user-specified 
consumer config 'allow.auto.create.topics' found"))
+                    .count();
+            assertEquals(3, errorCount,
+                    "Should log exactly 3 errors: one for main.consumer.*, one 
for restore.consumer.*, one for global.consumer.*");
+        }
+    }
+
+
     @Test
     public void shouldSupportNonPrefixedAdminConfigs() {
         props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 10);

Reply via email to