This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 70dd577286d KAFKA-15909 Throw error when consumer configured with
empty/whitespace-only group.id for LegacyKafkaConsumer (#16933)
70dd577286d is described below
commit 70dd577286de31ef20dc4f198e95f9b9e4479b47
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Aug 30 23:24:36 2024 +0800
KAFKA-15909 Throw error when consumer configured with empty/whitespace-only
group.id for LegacyKafkaConsumer (#16933)
Per KIP-289, the use of an empty value for group.id configuration was
deprecated back in 2.2.0.
In 3.7, the AsyncKafkaConsumer implementation will throw an error (see
KAFKA-14438).
This task is to update the LegacyKafkaConsumer implementation to throw an
error in 4.0.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../consumer/internals/ClassicKafkaConsumer.java | 9 ++-
.../kafka/clients/consumer/KafkaConsumerTest.java | 16 ++++++
.../consumer/internals/AsyncKafkaConsumerTest.java | 34 -----------
.../ClientAuthenticationFailureTest.java | 4 +-
.../kafka/api/PlaintextConsumerTest.scala | 65 +---------------------
5 files changed, 24 insertions(+), 104 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index b9e73f963a0..cb8e0b6dcb2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -154,17 +154,16 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
try {
GroupRebalanceConfig groupRebalanceConfig = new
GroupRebalanceConfig(config,
GroupRebalanceConfig.ProtocolType.CONSUMER);
+ if (groupRebalanceConfig.groupId != null &&
groupRebalanceConfig.groupId.isEmpty()) {
+ throw new InvalidGroupIdException("The configured " +
ConsumerConfig.GROUP_ID_CONFIG
+ + " should not be an empty string or whitespace.");
+ }
this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
this.clientId =
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
LogContext logContext = createLogContext(config,
groupRebalanceConfig);
this.log = logContext.logger(getClass());
boolean enableAutoCommit =
config.getBoolean(ENABLE_AUTO_COMMIT_CONFIG);
- groupId.ifPresent(groupIdStr -> {
- if (groupIdStr.isEmpty()) {
- log.warn("Support for using the empty group id by
consumers is deprecated and will be removed in the next major release.");
- }
- });
log.debug("Initializing the Kafka consumer");
this.requestTimeoutMs =
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index ae81938e242..bff8f970d6d 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -3326,6 +3326,22 @@ public void
testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro
assertEquals("Telemetry is not enabled. Set config
`enable.metrics.push` to `true`.", exception.getMessage());
}
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testEmptyGroupId(GroupProtocol groupProtocol) {
+ KafkaException e = assertThrows(KafkaException.class, () ->
newConsumer(groupProtocol, ""));
+ assertInstanceOf(InvalidGroupIdException.class, e.getCause());
+ assertEquals("The configured group.id should not be an empty string or
whitespace.", e.getCause().getMessage());
+ }
+
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testGroupIdWithWhitespace(GroupProtocol groupProtocol) {
+ KafkaException e = assertThrows(KafkaException.class, () ->
newConsumer(groupProtocol, " "));
+ assertInstanceOf(InvalidGroupIdException.class, e.getCause());
+ assertEquals("The configured group.id should not be an empty string or
whitespace.", e.getCause().getMessage());
+ }
+
private KafkaConsumer<String, String>
consumerForCheckingTimeoutException(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 4a2040fc129..ae694c12a09 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -182,12 +182,6 @@ public class AsyncKafkaConsumerTest {
return newConsumer(props);
}
- @SuppressWarnings("UnusedReturnValue")
- private AsyncKafkaConsumer<String, String> newConsumerWithEmptyGroupId() {
- final Properties props = requiredConsumerConfigAndGroupId("");
- return newConsumer(props);
- }
-
private AsyncKafkaConsumer<String, String> newConsumer(Properties props) {
final ConsumerConfig config = new ConsumerConfig(props);
return newConsumer(config);
@@ -245,12 +239,6 @@ public class AsyncKafkaConsumerTest {
assertDoesNotThrow(() -> consumer.close());
}
- @Test
- public void testInvalidGroupId() {
- KafkaException e = assertThrows(KafkaException.class,
this::newConsumerWithEmptyGroupId);
- assertInstanceOf(InvalidGroupIdException.class, e.getCause());
- }
-
@Test
public void testFailOnClosedConsumer() {
consumer = newConsumer();
@@ -1850,16 +1838,6 @@ public class AsyncKafkaConsumerTest {
assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
}
- @Test
- public void testGroupIdEmpty() {
- testInvalidGroupId("");
- }
-
- @Test
- public void testGroupIdOnlyWhitespaces() {
- testInvalidGroupId(" ");
- }
-
@Test
public void testEnsurePollEventSentOnConsumerPoll() {
SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), OffsetResetStrategy.NONE);
@@ -1882,18 +1860,6 @@ public class AsyncKafkaConsumerTest {
verify(applicationEventHandler).add(any(PollEvent.class));
}
- private void testInvalidGroupId(final String groupId) {
- final Properties props = requiredConsumerConfigAndGroupId(groupId);
- final ConsumerConfig config = new ConsumerConfig(props);
-
- final Exception exception = assertThrows(
- KafkaException.class,
- () -> consumer = newConsumer(config)
- );
-
- assertEquals("Failed to construct kafka consumer",
exception.getMessage());
- }
-
private Properties requiredConsumerConfigAndGroupId(final String groupId) {
final Properties props = requiredConsumerConfig();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
index c13a0542fca..821f99a3e43 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.errors.SaslAuthenticationException;
@@ -86,12 +87,11 @@ public class ClientAuthenticationFailureTest {
public void testConsumerWithInvalidCredentials() {
Map<String, Object> props = new HashMap<>(saslClientConfigs);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" +
server.port());
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "");
StringDeserializer deserializer = new StringDeserializer();
try (KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(props, deserializer, deserializer)) {
assertThrows(SaslAuthenticationException.class, () -> {
- consumer.subscribe(Collections.singleton(topic));
+ consumer.assign(Collections.singleton(new
TopicPartition(topic, 0)));
consumer.poll(Duration.ofSeconds(10));
});
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 0fbee055025..8a00feacb0b 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -26,12 +26,12 @@ import
org.apache.kafka.common.errors.{InvalidGroupIdException, InvalidTopicExce
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.record.{CompressionType, TimestampType}
import org.apache.kafka.common.serialization._
-import org.apache.kafka.common.{KafkaException, MetricName, TopicPartition}
+import org.apache.kafka.common.{MetricName, TopicPartition}
import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{CsvSource, MethodSource}
+import org.junit.jupiter.params.provider.MethodSource
import java.util.concurrent.{CompletableFuture, TimeUnit}
import scala.jdk.CollectionConverters._
@@ -720,67 +720,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertThrows(classOf[InvalidGroupIdException], () =>
consumer1.commitSync())
}
- // Empty group ID only supported for classic group protocol
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
- def testConsumingWithEmptyGroupId(quorum: String, groupProtocol: String):
Unit = {
- val topic = "test_topic"
- val partition = 0
- val tp = new TopicPartition(topic, partition)
- createTopic(topic)
-
- val producer = createProducer()
- producer.send(new ProducerRecord(topic, partition, "k1".getBytes,
"v1".getBytes)).get()
- producer.send(new ProducerRecord(topic, partition, "k2".getBytes,
"v2".getBytes)).get()
- producer.close()
-
- // consumer 1 uses the empty group id
- val consumer1Config = new Properties(consumerConfig)
- consumer1Config.put(ConsumerConfig.GROUP_ID_CONFIG, "")
- consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1")
- consumer1Config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1")
- val consumer1 = createConsumer(configOverrides = consumer1Config)
-
- // consumer 2 uses the empty group id and consumes from latest offset if
there is no committed offset
- val consumer2Config = new Properties(consumerConfig)
- consumer2Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
- consumer2Config.put(ConsumerConfig.GROUP_ID_CONFIG, "")
- consumer2Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer2")
- consumer2Config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1")
- val consumer2 = createConsumer(configOverrides = consumer2Config)
-
- consumer1.assign(asList(tp))
- consumer2.assign(asList(tp))
-
- val records1 = consumer1.poll(Duration.ofMillis(5000))
- consumer1.commitSync()
-
- val records2 = consumer2.poll(Duration.ofMillis(5000))
- consumer2.commitSync()
-
- consumer1.close()
- consumer2.close()
-
- assertTrue(records1.count() == 1 &&
records1.records(tp).asScala.head.offset == 0,
- "Expected consumer1 to consume one message from offset 0")
- assertTrue(records2.count() == 1 &&
records2.records(tp).asScala.head.offset == 1,
- "Expected consumer2 to consume one message from offset 1, which is the
committed offset of consumer1")
- }
-
- // Empty group ID not supported with consumer group protocol
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
- @CsvSource(Array(
- "kraft+kip848, consumer"
- ))
- def testEmptyGroupIdNotSupported(quorum:String, groupProtocol: String): Unit
= {
- val consumer1Config = new Properties(consumerConfig)
- consumer1Config.put(ConsumerConfig.GROUP_ID_CONFIG, "")
- consumer1Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
- consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1")
-
- assertThrows(classOf[KafkaException], () => createConsumer(configOverrides
= consumer1Config))
- }
-
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testStaticConsumerDetectsNewPartitionCreatedAfterRestart(quorum:String,
groupProtocol: String): Unit = {