This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c3e7d62 KAFKA-6774; Improve the default group id behavior in
KafkaConsumer (KIP-289) (#5877)
c3e7d62 is described below
commit c3e7d6252c41d48e74379810595c978efada9efb
Author: Vahid Hashemian <[email protected]>
AuthorDate: Fri Nov 16 00:58:56 2018 -0800
KAFKA-6774; Improve the default group id behavior in KafkaConsumer
(KIP-289) (#5877)
Improve the default group id behavior by:
* changing the default consumer group to null, where no offset commit or
fetch, or group management operations are allowed
* deprecating the use of empty (`""`) consumer group on the client
Reviewers: Jason Gustafson <[email protected]>
---
.../kafka/clients/consumer/ConsumerConfig.java | 2 +-
.../kafka/clients/consumer/KafkaConsumer.java | 109 ++++++++++-------
.../consumer/internals/AbstractCoordinator.java | 5 +-
.../kafka/clients/consumer/KafkaConsumerTest.java | 134 +++++++++++++++++----
.../ClientAuthenticationFailureTest.java | 1 +
.../kafka/api/IntegrationTestHarness.scala | 4 +-
.../kafka/api/PlaintextConsumerTest.scala | 128 +++++++++++++++++++-
docs/upgrade.html | 9 ++
8 files changed, 318 insertions(+), 74 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 795a762..9cd5766 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -277,7 +277,7 @@ public class ConsumerConfig extends AbstractConfig {
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
Importance.MEDIUM,
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
- .define(GROUP_ID_CONFIG, Type.STRING, "",
Importance.HIGH, GROUP_ID_DOC)
+ .define(GROUP_ID_CONFIG, Type.STRING, null,
Importance.HIGH, GROUP_ID_DOC)
.define(SESSION_TIMEOUT_MS_CONFIG,
Type.INT,
10000,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3a75672..5c673a5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -37,6 +37,8 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
@@ -557,6 +559,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final Logger log;
private final String clientId;
+ private String groupId;
private final ConsumerCoordinator coordinator;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
@@ -654,18 +657,23 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
}
@SuppressWarnings("unchecked")
- private KafkaConsumer(ConsumerConfig config,
- Deserializer<K> keyDeserializer,
- Deserializer<V> valueDeserializer) {
+ private KafkaConsumer(ConsumerConfig config, Deserializer<K>
keyDeserializer, Deserializer<V> valueDeserializer) {
try {
String clientId =
config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (clientId.isEmpty())
clientId = "consumer-" +
CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
- String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
-
+ this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
LogContext logContext = new LogContext("[Consumer clientId=" +
clientId + ", groupId=" + groupId + "] ");
this.log = logContext.logger(getClass());
+ boolean enableAutoCommit =
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+ if (groupId == null) { // overwrite in case of default group id
where the config is not explicitly provided
+ if
(!config.originals().containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
+ enableAutoCommit = false;
+ else if (enableAutoCommit)
+ throw new
InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + "
cannot be set to true when default group id (null) is used.");
+ } else if (groupId.isEmpty())
+ log.warn("Support for using the empty group id by consumers is
deprecated and will be removed in the next major release.");
log.debug("Initializing the Kafka consumer");
this.requestTimeoutMs =
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
@@ -678,8 +686,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricsTags);
List<MetricsReporter> reporters =
config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class,
- Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG,
clientId));
+ MetricsReporter.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
@@ -691,16 +698,14 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
ConsumerInterceptor.class);
this.interceptors = new ConsumerInterceptors<>(interceptorList);
if (keyDeserializer == null) {
- this.keyDeserializer =
config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- Deserializer.class);
+ this.keyDeserializer =
config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
this.keyDeserializer.configure(config.originals(), true);
} else {
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
this.keyDeserializer = keyDeserializer;
}
if (valueDeserializer == null) {
- this.valueDeserializer =
config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- Deserializer.class);
+ this.valueDeserializer =
config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
this.valueDeserializer.configure(config.originals(), false);
} else {
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
@@ -710,17 +715,14 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
this.metadata = new Metadata(retryBackoffMs,
config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
true, false, clusterResourceListeners);
List<InetSocketAddress> addresses =
ClientUtils.parseAndValidateAddresses(
- config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
- config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
+ config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
this.metadata.bootstrap(addresses, time.milliseconds());
String metricGrpPrefix = "consumer";
ConsumerMetrics metricsRegistry = new
ConsumerMetrics(metricsTags.keySet(), "consumer");
ChannelBuilder channelBuilder =
ClientUtils.createChannelBuilder(config, time);
-
IsolationLevel isolationLevel = IsolationLevel.valueOf(
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics,
metricsRegistry.fetcherMetrics);
-
int heartbeatIntervalMs =
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
NetworkClient netClient = new NetworkClient(
@@ -755,24 +757,26 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
int maxPollIntervalMs =
config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
int sessionTimeoutMs =
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
- this.coordinator = new ConsumerCoordinator(logContext,
- this.client,
- groupId,
- maxPollIntervalMs,
- sessionTimeoutMs,
- new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs,
maxPollIntervalMs, retryBackoffMs),
- assignors,
- this.metadata,
- this.subscriptions,
- metrics,
- metricGrpPrefix,
- this.time,
- retryBackoffMs,
-
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
-
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
- this.interceptors,
-
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
-
config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
+ // no coordinator will be constructed for the default (null) group
id
+ this.coordinator = groupId == null ? null :
+ new ConsumerCoordinator(logContext,
+ this.client,
+ groupId,
+ maxPollIntervalMs,
+ sessionTimeoutMs,
+ new Heartbeat(time, sessionTimeoutMs,
heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs),
+ assignors,
+ this.metadata,
+ this.subscriptions,
+ metrics,
+ metricGrpPrefix,
+ this.time,
+ retryBackoffMs,
+ enableAutoCommit,
+
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
+ this.interceptors,
+
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
+
config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
this.fetcher = new Fetcher<>(
logContext,
this.client,
@@ -795,11 +799,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
-
log.debug("Kafka consumer initialized");
} catch (Throwable t) {
- // call close methods if internal objects are already constructed
- // this is to prevent resource leak. see KAFKA-2121
+ // call close methods if internal objects are already constructed;
this is to prevent resource leak. see KAFKA-2121
close(0, true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka consumer", t);
@@ -822,7 +824,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
long retryBackoffMs,
long requestTimeoutMs,
int defaultApiTimeoutMs,
- List<PartitionAssignor> assignors) {
+ List<PartitionAssignor> assignors,
+ String groupId) {
this.log = logContext.logger(getClass());
this.clientId = clientId;
this.coordinator = coordinator;
@@ -839,6 +842,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.requestTimeoutMs = requestTimeoutMs;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.assignors = assignors;
+ this.groupId = groupId;
}
/**
@@ -911,9 +915,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
public void subscribe(Collection<String> topics, ConsumerRebalanceListener
listener) {
acquireAndEnsureOpen();
try {
- if (topics == null) {
+ maybeThrowInvalidGroupIdException();
+ if (topics == null)
throw new IllegalArgumentException("Topic collection to
subscribe to cannot be null");
- } else if (topics.isEmpty()) {
+ if (topics.isEmpty()) {
// treat subscribing to empty topic list as the same as
unsubscribing
this.unsubscribe();
} else {
@@ -980,6 +985,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
{
+ maybeThrowInvalidGroupIdException();
if (pattern == null)
throw new IllegalArgumentException("Topic pattern to subscribe to
cannot be null");
@@ -1026,7 +1032,8 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
try {
fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
this.subscriptions.unsubscribe();
- this.coordinator.maybeLeaveGroup();
+ if (this.coordinator != null)
+ this.coordinator.maybeLeaveGroup();
this.metadata.needMetadataForAllTopics(false);
log.info("Unsubscribed all topics or patterns and assigned
partitions");
} finally {
@@ -1073,7 +1080,8 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
// make sure the offsets of topic partitions the consumer is
unsubscribing from
// are committed since there will be no following rebalance
-
this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
+ if (coordinator != null)
+
this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
log.debug("Subscribed to partition(s): {}",
Utils.join(partitions, ", "));
this.subscriptions.assignFromUser(new HashSet<>(partitions));
@@ -1211,7 +1219,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
* Visible for testing
*/
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
- if (!coordinator.poll(timer)) {
+ if (coordinator != null && !coordinator.poll(timer)) {
return false;
}
@@ -1219,7 +1227,8 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
}
private Map<TopicPartition, List<ConsumerRecord<K, V>>>
pollForFetches(Timer timer) {
- long pollTimeout =
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()),
timer.remainingMs());
+ long pollTimeout = coordinator == null ? timer.remainingMs() :
+ Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()),
timer.remainingMs());
// if data is available already, return it immediately
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records =
fetcher.fetchedRecords();
@@ -1249,7 +1258,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
// after the long poll, we should check whether the group needs to
rebalance
// prior to returning data so that the group can stabilize faster
- if (coordinator.rejoinNeededOrPending()) {
+ if (coordinator != null && coordinator.rejoinNeededOrPending()) {
return Collections.emptyMap();
}
@@ -1324,6 +1333,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
public void commitSync(Duration timeout) {
acquireAndEnsureOpen();
try {
+ maybeThrowInvalidGroupIdException();
if (!coordinator.commitOffsetsSync(subscriptions.allConsumed(),
time.timer(timeout))) {
throw new TimeoutException("Timeout of " + timeout.toMillis()
+ "ms expired before successfully " +
"committing the current consumed offsets");
@@ -1406,6 +1416,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
public void commitSync(final Map<TopicPartition, OffsetAndMetadata>
offsets, final Duration timeout) {
acquireAndEnsureOpen();
try {
+ maybeThrowInvalidGroupIdException();
if (!coordinator.commitOffsetsSync(new HashMap<>(offsets),
time.timer(timeout))) {
throw new TimeoutException("Timeout of " + timeout.toMillis()
+ "ms expired before successfully " +
"committing offsets " + offsets);
@@ -1475,6 +1486,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata>
offsets, OffsetCommitCallback callback) {
acquireAndEnsureOpen();
try {
+ maybeThrowInvalidGroupIdException();
log.debug("Committing offsets: {}", offsets);
coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
} finally {
@@ -1686,6 +1698,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
public OffsetAndMetadata committed(TopicPartition partition, final
Duration timeout) {
acquireAndEnsureOpen();
try {
+ maybeThrowInvalidGroupIdException();
Map<TopicPartition, OffsetAndMetadata> offsets =
coordinator.fetchCommittedOffsets(
Collections.singleton(partition), time.timer(timeout));
if (offsets == null) {
@@ -2163,7 +2176,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
// coordinator lookup if there are partitions which have missing
positions, so
// a consumer with manually assigned partitions can avoid a
coordinator dependence
// by always ensuring that assigned partitions have an initial
position.
- if (!coordinator.refreshCommittedOffsetsIfNeeded(timer)) return false;
+ if (coordinator != null &&
!coordinator.refreshCommittedOffsetsIfNeeded(timer)) return false;
// If there are partitions still needing a position and a reset policy
is defined,
// request reset using the default policy. If no reset strategy is
defined and there
@@ -2216,6 +2229,12 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + "
configuration property");
}
+ private void maybeThrowInvalidGroupIdException() {
+ if (groupId == null)
+ throw new InvalidGroupIdException("To use the group management or
offset commit APIs, you must " +
+ "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in
the consumer configuration.");
+ }
+
// Visible for testing
String getClientId() {
return clientId;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index d983087..335e0f2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -121,7 +121,7 @@ public abstract class AbstractCoordinator implements
Closeable {
private Generation generation = Generation.NO_GENERATION;
private RequestFuture<Void> findCoordinatorFuture = null;
-
+
/**
* Initialize the coordination manager.
*/
@@ -139,7 +139,8 @@ public abstract class AbstractCoordinator implements
Closeable {
this.log = logContext.logger(AbstractCoordinator.class);
this.client = client;
this.time = time;
- this.groupId = groupId;
+ this.groupId = Objects.requireNonNull(groupId,
+ "Expected a non-null group id for coordinator construction");
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.sessionTimeoutMs = sessionTimeoutMs;
this.leaveGroupOnClose = leaveGroupOnClose;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 987bad2..3657077 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -35,6 +35,8 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
@@ -138,6 +140,8 @@ public class KafkaConsumerTest {
// a concurrent heartbeat request
private final int autoCommitIntervalMs = 500;
+ private final String groupId = "mock-group";
+
@Rule
public ExpectedException expectedException = ExpectedException.none();
@@ -203,7 +207,7 @@ public class KafkaConsumerTest {
@Test
public void testSubscription() {
- KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+ KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId);
consumer.subscribe(singletonList(topic));
assertEquals(singleton(topic), consumer.subscription());
@@ -226,21 +230,21 @@ public class KafkaConsumerTest {
@Test(expected = IllegalArgumentException.class)
public void testSubscriptionOnNullTopicCollection() {
- try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+ try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
consumer.subscribe((List<String>) null);
}
}
@Test(expected = IllegalArgumentException.class)
public void testSubscriptionOnNullTopic() {
- try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+ try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
consumer.subscribe(singletonList((String) null));
}
}
@Test(expected = IllegalArgumentException.class)
public void testSubscriptionOnEmptyTopic() {
- try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+ try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
String emptyTopic = " ";
consumer.subscribe(singletonList(emptyTopic));
}
@@ -248,7 +252,7 @@ public class KafkaConsumerTest {
@Test(expected = IllegalArgumentException.class)
public void testSubscriptionOnNullPattern() {
- try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+ try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
consumer.subscribe((Pattern) null);
}
}
@@ -258,6 +262,7 @@ public class KafkaConsumerTest {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
props.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"");
+ props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(props)) {
consumer.subscribe(singletonList(topic));
}
@@ -265,7 +270,7 @@ public class KafkaConsumerTest {
@Test(expected = IllegalArgumentException.class)
public void testSeekNegative() {
- try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+ try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String)
null)) {
consumer.assign(singleton(new TopicPartition("nonExistTopic", 0)));
consumer.seek(new TopicPartition("nonExistTopic", 0), -1);
}
@@ -273,14 +278,14 @@ public class KafkaConsumerTest {
@Test(expected = IllegalArgumentException.class)
public void testAssignOnNullTopicPartition() {
- try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+ try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String)
null)) {
consumer.assign(null);
}
}
@Test
public void testAssignOnEmptyTopicPartition() {
- try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+ try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
consumer.assign(Collections.<TopicPartition>emptyList());
assertTrue(consumer.subscription().isEmpty());
assertTrue(consumer.assignment().isEmpty());
@@ -289,14 +294,14 @@ public class KafkaConsumerTest {
@Test(expected = IllegalArgumentException.class)
public void testAssignOnNullTopicInPartition() {
- try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+ try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String)
null)) {
consumer.assign(singleton(new TopicPartition(null, 0)));
}
}
@Test(expected = IllegalArgumentException.class)
public void testAssignOnEmptyTopicInPartition() {
- try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+ try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String)
null)) {
consumer.assign(singleton(new TopicPartition(" ", 0)));
}
}
@@ -328,7 +333,7 @@ public class KafkaConsumerTest {
@Test
public void testPause() {
- KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+ KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId);
consumer.assign(singletonList(tp0));
assertEquals(singleton(tp0), consumer.assignment());
@@ -346,11 +351,19 @@ public class KafkaConsumerTest {
consumer.close();
}
- private KafkaConsumer<byte[], byte[]> newConsumer() {
+ private KafkaConsumer<byte[], byte[]> newConsumer(String groupId) {
+ return newConsumer(groupId, Optional.empty());
+ }
+
+ private KafkaConsumer<byte[], byte[]> newConsumer(String groupId,
Optional<Boolean> enableAutoCommit) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my.consumer");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MockMetricsReporter.class.getName());
+ if (groupId != null)
+ props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ if (enableAutoCommit.isPresent())
+ props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
enableAutoCommit.get().toString());
return newConsumer(props);
}
@@ -554,7 +567,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client,
metadata, assignor,
- OffsetResetStrategy.NONE, true);
+ OffsetResetStrategy.NONE, true, groupId);
consumer.assign(singletonList(tp0));
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE,
node), node);
@@ -577,7 +590,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client,
metadata, assignor,
- OffsetResetStrategy.NONE, true);
+ OffsetResetStrategy.NONE, true, groupId);
consumer.assign(singletonList(tp0));
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE,
node), node);
@@ -601,7 +614,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client,
metadata, assignor,
- OffsetResetStrategy.LATEST, true);
+ OffsetResetStrategy.LATEST, true, groupId);
consumer.assign(singletonList(tp0));
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE,
node), node);
@@ -1210,14 +1223,14 @@ public class KafkaConsumerTest {
@Test(expected = IllegalStateException.class)
public void testPollWithNoSubscription() {
- try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+ try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String)
null)) {
consumer.poll(Duration.ZERO);
}
}
@Test(expected = IllegalStateException.class)
public void testPollWithEmptySubscription() {
- try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+ try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
consumer.subscribe(Collections.<String>emptyList());
consumer.poll(Duration.ZERO);
}
@@ -1225,7 +1238,7 @@ public class KafkaConsumerTest {
@Test(expected = IllegalStateException.class)
public void testPollWithEmptyUserAssignment() {
- try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
+ try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
consumer.assign(Collections.<TopicPartition>emptySet());
consumer.poll(Duration.ZERO);
}
@@ -1265,13 +1278,78 @@ public class KafkaConsumerTest {
@Test
public void closeShouldBeIdempotent() {
- KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+ KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null);
consumer.close();
consumer.close();
consumer.close();
}
@Test
+ public void testOperationsBySubscribingConsumerWithDefaultGroupId() {
+ try {
+ newConsumer(null, Optional.of(Boolean.TRUE));
+ fail("Expected an InvalidConfigurationException");
+ } catch (KafkaException e) {
+ assertEquals(InvalidConfigurationException.class,
e.getCause().getClass());
+ }
+
+ try {
+ newConsumer((String) null).subscribe(Collections.singleton(topic));
+ fail("Expected an InvalidGroupIdException");
+ } catch (InvalidGroupIdException e) {
+ // OK, expected
+ }
+
+ try {
+ newConsumer((String) null).committed(tp0);
+ fail("Expected an InvalidGroupIdException");
+ } catch (InvalidGroupIdException e) {
+ // OK, expected
+ }
+
+ try {
+ newConsumer((String) null).commitAsync();
+ fail("Expected an InvalidGroupIdException");
+ } catch (InvalidGroupIdException e) {
+ // OK, expected
+ }
+
+ try {
+ newConsumer((String) null).commitSync();
+ fail("Expected an InvalidGroupIdException");
+ } catch (InvalidGroupIdException e) {
+ // OK, expected
+ }
+ }
+
+ @Test
+ public void testOperationsByAssigningConsumerWithDefaultGroupId() {
+ KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null);
+ consumer.assign(singleton(tp0));
+
+ try {
+ consumer.committed(tp0);
+ fail("Expected an InvalidGroupIdException");
+ } catch (InvalidGroupIdException e) {
+ // OK, expected
+ }
+
+ try {
+ consumer.commitAsync();
+ fail("Expected an InvalidGroupIdException");
+ } catch (InvalidGroupIdException e) {
+ // OK, expected
+ }
+
+ try {
+ consumer.commitSync();
+ fail("Expected an InvalidGroupIdException");
+ } catch (InvalidGroupIdException e) {
+ // OK, expected
+ }
+ }
+
+ @Test
public void testMetricConfigRecordingLevel() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
@@ -1681,13 +1759,20 @@ public class KafkaConsumerTest {
Metadata metadata,
PartitionAssignor
assignor,
boolean
autoCommitEnabled) {
- return newConsumer(time, client, metadata, assignor,
OffsetResetStrategy.EARLIEST, autoCommitEnabled);
+ return newConsumer(time, client, metadata, assignor,
OffsetResetStrategy.EARLIEST, autoCommitEnabled, groupId);
}
private KafkaConsumer<String, String> newConsumerNoAutoCommit(Time time,
KafkaClient
client,
Metadata
metadata) {
- return newConsumer(time, client, metadata, new RangeAssignor(),
OffsetResetStrategy.EARLIEST, false);
+ return newConsumer(time, client, metadata, new RangeAssignor(),
OffsetResetStrategy.EARLIEST, false, groupId);
+ }
+
+ private KafkaConsumer<String, String> newConsumer(Time time,
+ KafkaClient client,
+ Metadata metadata,
+ String groupId) {
+ return newConsumer(time, client, metadata, new RangeAssignor(),
OffsetResetStrategy.LATEST, true, groupId);
}
private KafkaConsumer<String, String> newConsumer(Time time,
@@ -1695,9 +1780,9 @@ public class KafkaConsumerTest {
Metadata metadata,
PartitionAssignor
assignor,
OffsetResetStrategy
resetStrategy,
- boolean
autoCommitEnabled) {
+ boolean
autoCommitEnabled,
+ String groupId) {
String clientId = "mock-consumer";
- String groupId = "mock-group";
String metricGroupPrefix = "consumer";
long retryBackoffMs = 100;
int requestTimeoutMs = 30000;
@@ -1782,7 +1867,8 @@ public class KafkaConsumerTest {
retryBackoffMs,
requestTimeoutMs,
defaultApiTimeoutMs,
- assignors);
+ assignors,
+ groupId);
}
private static class FetchInfo {
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
index 938fe94..61606ab 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
@@ -84,6 +84,7 @@ public class ClientAuthenticationFailureTest {
public void testConsumerWithInvalidCredentials() {
Map<String, Object> props = new HashMap<>(saslClientConfigs);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" +
server.port());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "");
StringDeserializer deserializer = new StringDeserializer();
try (KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(props, deserializer, deserializer)) {
diff --git
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 0e2797a..5a20005 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -113,10 +113,12 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
def createConsumer[K, V](keyDeserializer: Deserializer[K] = new
ByteArrayDeserializer,
valueDeserializer: Deserializer[V] = new
ByteArrayDeserializer,
- configOverrides: Properties = new Properties):
KafkaConsumer[K, V] = {
+ configOverrides: Properties = new Properties,
+ configsToRemove: List[String] = List()):
KafkaConsumer[K, V] = {
val props = new Properties
props ++= consumerConfig
props ++= configOverrides
+ configsToRemove.foreach(props.remove(_))
val consumer = new KafkaConsumer[K, V](props, keyDeserializer,
valueDeserializer)
consumers += consumer
consumer
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index c06a796..c11fc12 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -14,6 +14,7 @@ package kafka.api
import java.time.Duration
import java.util
+import java.util.Arrays.asList
import java.util.regex.Pattern
import java.util.{Collections, Locale, Optional, Properties}
@@ -23,7 +24,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
import org.apache.kafka.common.{MetricName, TopicPartition}
-import org.apache.kafka.common.errors.InvalidTopicException
+import org.apache.kafka.common.errors.{InvalidGroupIdException,
InvalidTopicException}
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.record.{CompressionType, TimestampType}
import org.apache.kafka.common.serialization._
@@ -1814,4 +1815,129 @@ class PlaintextConsumerTest extends BaseConsumerTest {
s"The current assignment is ${consumer.assignment()}")
}
+ @Test
+ def testConsumingWithNullGroupId(): Unit = {
+ val topic = "test_topic"
+ val partition = 0;
+ val tp = new TopicPartition(topic, partition)
+ createTopic(topic, 1, 1)
+
+ TestUtils.waitUntilTrue(() => {
+ this.zkClient.topicExists(topic)
+ }, "Failed to create topic")
+
+ val producer = createProducer()
+ producer.send(new ProducerRecord(topic, partition, "k1".getBytes,
"v1".getBytes)).get()
+ producer.send(new ProducerRecord(topic, partition, "k2".getBytes,
"v2".getBytes)).get()
+ producer.send(new ProducerRecord(topic, partition, "k3".getBytes,
"v3".getBytes)).get()
+ producer.close()
+
+ // consumer 1 uses the default group id and consumes from earliest offset
+ val consumer1Config = new Properties(consumerConfig)
+ consumer1Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1")
+ val consumer1 = createConsumer(
+ configOverrides = consumer1Config,
+ configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+
+ // consumer 2 uses the default group id and consumes from latest offset
+ val consumer2Config = new Properties(consumerConfig)
+ consumer2Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
+ consumer2Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer2")
+ val consumer2 = createConsumer(
+ configOverrides = consumer2Config,
+ configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+
+ // consumer 3 uses the default group id and starts from an explicit offset
+ val consumer3Config = new Properties(consumerConfig)
+ consumer3Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer3")
+ val consumer3 = createConsumer(
+ configOverrides = consumer3Config,
+ configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
+
+ consumer1.assign(asList(tp))
+ consumer2.assign(asList(tp))
+ consumer3.assign(asList(tp))
+ consumer3.seek(tp, 1)
+
+ val numRecords1 = consumer1.poll(Duration.ofMillis(5000)).count()
+
+ try {
+ consumer1.commitSync()
+ fail("Expected offset commit to fail due to null group id")
+ } catch {
+ case e: InvalidGroupIdException => // OK
+ }
+
+ try {
+ consumer2.committed(tp)
+ fail("Expected committed offset fetch to fail due to null group id")
+ } catch {
+ case e: InvalidGroupIdException => // OK
+ }
+
+ val numRecords2 = consumer2.poll(Duration.ofMillis(5000)).count()
+ val numRecords3 = consumer3.poll(Duration.ofMillis(5000)).count()
+
+ consumer1.unsubscribe()
+ consumer2.unsubscribe()
+ consumer3.unsubscribe()
+
+ consumer1.close()
+ consumer2.close()
+ consumer3.close()
+
+ assertEquals("Expected consumer1 to consume from earliest offset", 3,
numRecords1)
+ assertEquals("Expected consumer2 to consume from latest offset", 0,
numRecords2)
+ assertEquals("Expected consumer3 to consume from offset 1", 2, numRecords3)
+ }
+
+ @Test
+ def testConsumingWithEmptyGroupId(): Unit = {
+ val topic = "test_topic"
+ val partition = 0;
+ val tp = new TopicPartition(topic, partition)
+ createTopic(topic, 1, 1)
+
+ TestUtils.waitUntilTrue(() => {
+ this.zkClient.topicExists(topic)
+ }, "Failed to create topic")
+
+ val producer = createProducer()
+ producer.send(new ProducerRecord(topic, partition, "k1".getBytes,
"v1".getBytes)).get()
+ producer.send(new ProducerRecord(topic, partition, "k2".getBytes,
"v2".getBytes)).get()
+ producer.close()
+
+ // consumer 1 uses the empty group id
+ val consumer1Config = new Properties(consumerConfig)
+ consumer1Config.put(ConsumerConfig.GROUP_ID_CONFIG, "")
+ consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1")
+ consumer1Config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1")
+ val consumer1 = createConsumer(configOverrides = consumer1Config)
+
+ // consumer 2 uses the empty group id and consumes from latest offset if
there is no committed offset
+ val consumer2Config = new Properties(consumerConfig)
+ consumer2Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
+ consumer2Config.put(ConsumerConfig.GROUP_ID_CONFIG, "")
+ consumer2Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer2")
+ consumer2Config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1")
+ val consumer2 = createConsumer(configOverrides = consumer2Config)
+
+ consumer1.assign(asList(tp))
+ consumer2.assign(asList(tp))
+
+ val records1 = consumer1.poll(Duration.ofMillis(5000))
+ consumer1.commitSync()
+
+ val records2 = consumer2.poll(Duration.ofMillis(5000))
+ consumer2.commitSync()
+
+ consumer1.close()
+ consumer2.close()
+
+ assertTrue("Expected consumer1 to consume one message from offset 0",
+ records1.count() == 1 && records1.records(tp).asScala.head.offset == 0)
+ assertTrue("Expected consumer2 to consume one message from offset 1, which
is the committed offset of consumer1",
+ records2.count() == 1 && records2.records(tp).asScala.head.offset == 1)
+ }
}
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 33d9964..154547b 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -19,6 +19,15 @@
<script id="upgrade-template" type="text/x-handlebars-template">
+<h4><a id="upgrade_2_2_0" href="#upgrade_2_2_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x to
2.2.0</a></h4>
+<h5><a id="upgrade_220_notable" href="#upgrade_220_notable">Notable changes in
2.2.0</a></h5>
+<ul>
+ <li>The default consumer group id has been changed from the empty string
(<code>""</code>) to <code>null</code>. Consumers who use the new default group
id will not be able to subscribe to topics,
+ and fetch or commit offsets. The empty string as consumer group id is
deprecated but will be supported until a future major release. Old clients that
rely on the empty string group id will now
+ have to explicitly provide it as part of their consumer config. For
more information see
+ <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer">KIP-289</a>.</li>
+</ul>
+
<h4><a id="upgrade_2_1_0" href="#upgrade_2_1_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, or 2.0.0 to 2.1.0</a></h4>
<p><b>Note that 2.1.x contains a change to the internal schema used to store
consumer offsets. Once the upgrade is