This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/kip1071 by this push:
     new f7642748f80 Add group.protocol config handling for streams (#18033)
f7642748f80 is described below

commit f7642748f800ef441379c42fb1772522a9e8c130
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Dec 4 13:25:09 2024 +0100

    Add group.protocol config handling for streams (#18033)
    
    - accept new streams group.protocol value
    - do not forward to consumer clients
    - log message that not for production
---
 .../org/apache/kafka/streams/StreamsConfig.java    | 22 +++++++++++++++---
 .../apache/kafka/streams/StreamsConfigTest.java    | 26 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 3 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 476e582933e..a3200376ace 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -841,6 +841,8 @@ public class StreamsConfig extends AbstractConfig {
     public static final String 
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = 
"windowstore.changelog.additional.retention.ms";
     private static final String 
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows 
maintainMs to ensure data is not deleted from the log prematurely. Allows for 
clock drift. Default is 1 day";
 
+    private static final String[] IGNORED_UNPREFIXED_CONSUMER_CONFIGS =
+        new String[] {ConsumerConfig.GROUP_PROTOCOL_CONFIG};
     private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS =
         new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
ConsumerConfig.GROUP_PROTOCOL_CONFIG};
     private static final String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS =
@@ -1490,6 +1492,11 @@ public class StreamsConfig extends AbstractConfig {
         }
         
verifyTopologyOptimizationConfigs(getString(TOPOLOGY_OPTIMIZATION_CONFIG));
         verifyClientTelemetryConfigs();
+
+        if (doLog && 
getString(GROUP_PROTOCOL_CONFIG).equals(GroupProtocol.STREAMS.name().toLowerCase(Locale.ROOT)))
 {
+            log.warn("The streams rebalance protocol is still in development 
and should not be used in production. "
+                + "Please set group.protocol=classic (default) in all 
production use cases.");
+        }
     }
 
     private void verifyEOSTransactionTimeoutCompatibility() {
@@ -1610,7 +1617,8 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     private Map<String, Object> getCommonConsumerConfigs() {
-        final Map<String, Object> clientProvidedProps = 
getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
+        final Map<String, Object> clientProvidedProps = 
getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames(),
+            IGNORED_UNPREFIXED_CONSUMER_CONFIGS);
 
         checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
         checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, 
NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
@@ -1923,12 +1931,20 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     private Map<String, Object> getClientPropsWithPrefix(final String prefix,
-                                                         final Set<String> 
configNames) {
-        final Map<String, Object> props = clientProps(configNames, 
originals());
+                                                         final Set<String> 
configNames,
+                                                         final String[] 
ignoreFromGlobal) {
+        final Set<String> unprefixedConfigs = new HashSet<>(configNames);
+        Arrays.asList(ignoreFromGlobal).forEach(unprefixedConfigs::remove);
+        final Map<String, Object> props = clientProps(unprefixedConfigs, 
originals());
         props.putAll(originalsWithPrefix(prefix));
         return props;
     }
 
+    private Map<String, Object> getClientPropsWithPrefix(final String prefix,
+                                                         final Set<String> 
configNames) {
+        return getClientPropsWithPrefix(prefix, configNames, new String[0]);
+    }
+
     /**
      * Get a map of custom configs by removing from the originals all the 
Streams, Consumer, Producer, and AdminClient configs.
      * Prefixed properties are also removed because they are already added by 
{@link #getClientPropsWithPrefix(String, Set)}.
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 4467e252b92..c59352a027b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -1524,6 +1524,32 @@ public class StreamsConfigTest {
         }
     }
 
+    @Test
+    public void shouldHaveGroupConfigClassicDefault() {
+        streamsConfig = new StreamsConfig(props);
+        assertEquals("classic", 
streamsConfig.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG));
+    }
+
+    @Test
+    public void shouldLogWarningWhenStreamsProtocolIsUsed() {
+        props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+            logCaptureAppender.setClassLogger(StreamsConfig.class, Level.WARN);
+            streamsConfig = new StreamsConfig(props);
+            assertTrue(logCaptureAppender.getMessages().contains("The streams 
rebalance protocol is still in development and "
+                + "should not be used in production. Please set 
group.protocol=classic (default) in all production use cases."));
+        }
+        assertEquals("streams", 
streamsConfig.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG));
+    }
+
+    @Test
+    public void shouldNotApplyGroupConfigToConsumers() {
+        props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+        streamsConfig = new StreamsConfig(props);
+        assertEquals("classic", streamsConfig.getMainConsumerConfigs("a", "b", 
threadIdx).get("group.protocol"));
+        assertEquals("classic", 
streamsConfig.getRestoreConsumerConfigs(clientId).get("group.protocol"));
+    }
+
     @SuppressWarnings("deprecation")
     @Test
     public void shouldUseOldProductionExceptionHandlerWhenOnlyOldConfigIsSet() 
{

Reply via email to