This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 70dd577286d KAFKA-15909 Throw error when consumer configured with 
empty/whitespace-only group.id for LegacyKafkaConsumer (#16933)
70dd577286d is described below

commit 70dd577286de31ef20dc4f198e95f9b9e4479b47
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Aug 30 23:24:36 2024 +0800

    KAFKA-15909 Throw error when consumer configured with empty/whitespace-only 
group.id for LegacyKafkaConsumer (#16933)
    
    Per KIP-289, the use of an empty value for group.id configuration was 
deprecated back in 2.2.0.
    
    In 3.7, the AsyncKafkaConsumer implementation will throw an error (see 
KAFKA-14438).
    
    This task is to update the LegacyKafkaConsumer implementation to throw an 
error in 4.0.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../consumer/internals/ClassicKafkaConsumer.java   |  9 ++-
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 16 ++++++
 .../consumer/internals/AsyncKafkaConsumerTest.java | 34 -----------
 .../ClientAuthenticationFailureTest.java           |  4 +-
 .../kafka/api/PlaintextConsumerTest.scala          | 65 +---------------------
 5 files changed, 24 insertions(+), 104 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index b9e73f963a0..cb8e0b6dcb2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -154,17 +154,16 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         try {
             GroupRebalanceConfig groupRebalanceConfig = new 
GroupRebalanceConfig(config,
                     GroupRebalanceConfig.ProtocolType.CONSUMER);
+            if (groupRebalanceConfig.groupId != null && 
groupRebalanceConfig.groupId.isEmpty()) {
+                throw new InvalidGroupIdException("The configured " + 
ConsumerConfig.GROUP_ID_CONFIG
+                        + " should not be an empty string or whitespace.");
+            }
 
             this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
             this.clientId = 
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
             LogContext logContext = createLogContext(config, 
groupRebalanceConfig);
             this.log = logContext.logger(getClass());
             boolean enableAutoCommit = 
config.getBoolean(ENABLE_AUTO_COMMIT_CONFIG);
-            groupId.ifPresent(groupIdStr -> {
-                if (groupIdStr.isEmpty()) {
-                    log.warn("Support for using the empty group id by 
consumers is deprecated and will be removed in the next major release.");
-                }
-            });
 
             log.debug("Initializing the Kafka consumer");
             this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index ae81938e242..bff8f970d6d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -3326,6 +3326,22 @@ public void 
testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro
         assertEquals("Telemetry is not enabled. Set config 
`enable.metrics.push` to `true`.", exception.getMessage());
     }
 
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testEmptyGroupId(GroupProtocol groupProtocol) {
+        KafkaException e = assertThrows(KafkaException.class, () -> 
newConsumer(groupProtocol, ""));
+        assertInstanceOf(InvalidGroupIdException.class, e.getCause());
+        assertEquals("The configured group.id should not be an empty string or 
whitespace.", e.getCause().getMessage());
+    }
+
+    @ParameterizedTest
+    @EnumSource(GroupProtocol.class)
+    public void testGroupIdWithWhitespace(GroupProtocol groupProtocol) {
+        KafkaException e = assertThrows(KafkaException.class, () -> 
newConsumer(groupProtocol, " "));
+        assertInstanceOf(InvalidGroupIdException.class, e.getCause());
+        assertEquals("The configured group.id should not be an empty string or 
whitespace.", e.getCause().getMessage());
+    }
+
     private KafkaConsumer<String, String> 
consumerForCheckingTimeoutException(GroupProtocol groupProtocol) {
         ConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 4a2040fc129..ae694c12a09 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -182,12 +182,6 @@ public class AsyncKafkaConsumerTest {
         return newConsumer(props);
     }
 
-    @SuppressWarnings("UnusedReturnValue")
-    private AsyncKafkaConsumer<String, String> newConsumerWithEmptyGroupId() {
-        final Properties props = requiredConsumerConfigAndGroupId("");
-        return newConsumer(props);
-    }
-
     private AsyncKafkaConsumer<String, String> newConsumer(Properties props) {
         final ConsumerConfig config = new ConsumerConfig(props);
         return newConsumer(config);
@@ -245,12 +239,6 @@ public class AsyncKafkaConsumerTest {
         assertDoesNotThrow(() -> consumer.close());
     }
 
-    @Test
-    public void testInvalidGroupId() {
-        KafkaException e = assertThrows(KafkaException.class, 
this::newConsumerWithEmptyGroupId);
-        assertInstanceOf(InvalidGroupIdException.class, e.getCause());
-    }
-
     @Test
     public void testFailOnClosedConsumer() {
         consumer = newConsumer();
@@ -1850,16 +1838,6 @@ public class AsyncKafkaConsumerTest {
         
assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
     }
 
-    @Test
-    public void testGroupIdEmpty() {
-        testInvalidGroupId("");
-    }
-
-    @Test
-    public void testGroupIdOnlyWhitespaces() {
-        testInvalidGroupId("       ");
-    }
-
     @Test
     public void testEnsurePollEventSentOnConsumerPoll() {
         SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.NONE);
@@ -1882,18 +1860,6 @@ public class AsyncKafkaConsumerTest {
         verify(applicationEventHandler).add(any(PollEvent.class));
     }
 
-    private void testInvalidGroupId(final String groupId) {
-        final Properties props = requiredConsumerConfigAndGroupId(groupId);
-        final ConsumerConfig config = new ConsumerConfig(props);
-
-        final Exception exception = assertThrows(
-            KafkaException.class,
-            () -> consumer = newConsumer(config)
-        );
-
-        assertEquals("Failed to construct kafka consumer", 
exception.getMessage());
-    }
-
     private Properties requiredConsumerConfigAndGroupId(final String groupId) {
         final Properties props = requiredConsumerConfig();
         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
index c13a0542fca..821f99a3e43 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.errors.SaslAuthenticationException;
@@ -86,12 +87,11 @@ public class ClientAuthenticationFailureTest {
     public void testConsumerWithInvalidCredentials() {
         Map<String, Object> props = new HashMap<>(saslClientConfigs);
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + 
server.port());
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, "");
         StringDeserializer deserializer = new StringDeserializer();
 
         try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(props, deserializer, deserializer)) {
             assertThrows(SaslAuthenticationException.class, () -> {
-                consumer.subscribe(Collections.singleton(topic));
+                consumer.assign(Collections.singleton(new 
TopicPartition(topic, 0)));
                 consumer.poll(Duration.ofSeconds(10));
             });
         }
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 0fbee055025..8a00feacb0b 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -26,12 +26,12 @@ import 
org.apache.kafka.common.errors.{InvalidGroupIdException, InvalidTopicExce
 import org.apache.kafka.common.header.Headers
 import org.apache.kafka.common.record.{CompressionType, TimestampType}
 import org.apache.kafka.common.serialization._
-import org.apache.kafka.common.{KafkaException, MetricName, TopicPartition}
+import org.apache.kafka.common.{MetricName, TopicPartition}
 import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Timeout
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{CsvSource, MethodSource}
+import org.junit.jupiter.params.provider.MethodSource
 
 import java.util.concurrent.{CompletableFuture, TimeUnit}
 import scala.jdk.CollectionConverters._
@@ -720,67 +720,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertThrows(classOf[InvalidGroupIdException], () => 
consumer1.commitSync())
   }
 
-  // Empty group ID only supported for classic group protocol
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
-  def testConsumingWithEmptyGroupId(quorum: String, groupProtocol: String): 
Unit = {
-    val topic = "test_topic"
-    val partition = 0
-    val tp = new TopicPartition(topic, partition)
-    createTopic(topic)
-
-    val producer = createProducer()
-    producer.send(new ProducerRecord(topic, partition, "k1".getBytes, 
"v1".getBytes)).get()
-    producer.send(new ProducerRecord(topic, partition, "k2".getBytes, 
"v2".getBytes)).get()
-    producer.close()
-
-    // consumer 1 uses the empty group id
-    val consumer1Config = new Properties(consumerConfig)
-    consumer1Config.put(ConsumerConfig.GROUP_ID_CONFIG, "")
-    consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1")
-    consumer1Config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1")
-    val consumer1 = createConsumer(configOverrides = consumer1Config)
-
-    // consumer 2 uses the empty group id and consumes from latest offset if 
there is no committed offset
-    val consumer2Config = new Properties(consumerConfig)
-    consumer2Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
-    consumer2Config.put(ConsumerConfig.GROUP_ID_CONFIG, "")
-    consumer2Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer2")
-    consumer2Config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1")
-    val consumer2 = createConsumer(configOverrides = consumer2Config)
-
-    consumer1.assign(asList(tp))
-    consumer2.assign(asList(tp))
-
-    val records1 = consumer1.poll(Duration.ofMillis(5000))
-    consumer1.commitSync()
-
-    val records2 = consumer2.poll(Duration.ofMillis(5000))
-    consumer2.commitSync()
-
-    consumer1.close()
-    consumer2.close()
-
-    assertTrue(records1.count() == 1 && 
records1.records(tp).asScala.head.offset == 0,
-      "Expected consumer1 to consume one message from offset 0")
-    assertTrue(records2.count() == 1 && 
records2.records(tp).asScala.head.offset == 1,
-      "Expected consumer2 to consume one message from offset 1, which is the 
committed offset of consumer1")
-  }
-
-  // Empty group ID not supported with consumer group protocol
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  @CsvSource(Array(
-    "kraft+kip848, consumer"
-  ))
-  def testEmptyGroupIdNotSupported(quorum:String, groupProtocol: String): Unit 
= {
-    val consumer1Config = new Properties(consumerConfig)
-    consumer1Config.put(ConsumerConfig.GROUP_ID_CONFIG, "")
-    consumer1Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-    consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1")
-
-    assertThrows(classOf[KafkaException], () => createConsumer(configOverrides 
= consumer1Config))
-  }
-
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testStaticConsumerDetectsNewPartitionCreatedAfterRestart(quorum:String, 
groupProtocol: String): Unit = {

Reply via email to