This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new e6cff21 KAFKA-7320; Add consumer configuration to disable auto topic creation [KIP-361] (#5542) e6cff21 is described below commit e6cff21fd8c5add0eb7e55417a91f0530a7d3a32 Author: Dhruvil Shah <dhru...@confluent.io> AuthorDate: Wed May 8 09:31:05 2019 -0700 KAFKA-7320; Add consumer configuration to disable auto topic creation [KIP-361] (#5542) Implements KIP-361 to provide a consumer configuration to specify whether subscribing or assigning a non-existent topic would result in it being automatically created or not. Reviewers: Jason Gustafson <ja...@confluent.io> --- .../kafka/clients/ManualMetadataUpdater.java | 10 +- .../java/org/apache/kafka/clients/Metadata.java | 45 +++------ .../org/apache/kafka/clients/MetadataUpdater.java | 8 +- .../org/apache/kafka/clients/NetworkClient.java | 14 ++- .../admin/internals/AdminMetadataManager.java | 3 +- .../kafka/clients/consumer/ConsumerConfig.java | 14 ++- .../kafka/clients/consumer/KafkaConsumer.java | 3 +- .../consumer/internals/ConsumerMetadata.java | 10 +- .../producer/internals/ProducerMetadata.java | 7 +- .../apache/kafka/clients/NetworkClientTest.java | 40 +++++++- .../kafka/clients/consumer/KafkaConsumerTest.java | 4 +- .../internals/AbstractCoordinatorTest.java | 2 +- .../internals/ConsumerCoordinatorTest.java | 4 +- .../consumer/internals/ConsumerMetadataTest.java | 4 +- .../internals/ConsumerNetworkClientTest.java | 15 ++- .../clients/consumer/internals/FetcherTest.java | 2 +- .../internals/OffsetForLeaderEpochClientTest.java | 2 +- .../kafka/api/ConsumerTopicCreationTest.scala | 107 +++++++++++++++++++++ 18 files changed, 228 insertions(+), 66 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java index ec007a6..7fb0224 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java +++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java @@ -16,8 +16,8 @@ */ package org.apache.kafka.clients; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; -import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.RequestHeader; import org.slf4j.Logger; @@ -74,10 +74,10 @@ public class ManualMetadataUpdater implements MetadataUpdater { } @Override - public void handleAuthenticationFailure(AuthenticationException exception) { - // We don't fail the broker on authentication failures, but there is sufficient information in the broker logs - // to identify the failure. - log.debug("An authentication error occurred in broker-to-broker communication.", exception); + public void handleFatalException(KafkaException exception) { + // We don't fail the broker on failures, but there should be sufficient information in the logs indicating the reason + // for failure. + log.debug("An error occurred in broker-to-broker communication.", exception); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index ef01b4b..ae75045 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.InvalidMetadataException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.TopicAuthorizationException; @@ -66,8 +65,8 @@ public class Metadata implements Closeable { private int requestVersion; // bumped on every new topic addition private long lastRefreshMs; private long lastSuccessfulRefreshMs; - private AuthenticationException authenticationException; - private KafkaException metadataException; + private KafkaException fatalException; + private KafkaException recoverableException; private MetadataCache cache = MetadataCache.empty(); private boolean needUpdate; private final ClusterResourceListeners clusterResourceListeners; @@ -202,25 +201,13 @@ public class Metadata implements Closeable { } /** - * If any non-retriable authentication exceptions were encountered during - * metadata update, clear and return the exception. + * If any non-retriable exceptions were encountered during metadata update, clear and return the exception. */ - public synchronized AuthenticationException getAndClearAuthenticationException() { - if (authenticationException != null) { - AuthenticationException exception = authenticationException; - authenticationException = null; - return exception; - } else - return null; - } - - synchronized KafkaException getAndClearMetadataException() { - if (this.metadataException != null) { - KafkaException metadataException = this.metadataException; - this.metadataException = null; - return metadataException; - } else - return null; + public synchronized KafkaException getAndClearMetadataException() { + KafkaException metadataException = Optional.ofNullable(fatalException).orElse(recoverableException); + fatalException = null; + recoverableException = null; + return metadataException; } public synchronized void bootstrap(List<InetSocketAddress> addresses, long now) { @@ -281,7 +268,7 @@ public class Metadata implements Closeable { private void maybeSetMetadataError(Cluster cluster) { // if we encounter any invalid topics, cache the exception to later throw to the user - metadataException = null; + recoverableException = null; checkInvalidTopics(cluster); checkUnauthorizedTopics(cluster); } @@ -289,14 +276,16 @@ public class Metadata implements Closeable { private void checkInvalidTopics(Cluster cluster) { if (!cluster.invalidTopics().isEmpty()) { log.error("Metadata response reported invalid topics {}", cluster.invalidTopics()); - metadataException = new InvalidTopicException(cluster.invalidTopics()); + // We may be able to recover from this exception if metadata for this topic is no longer needed + recoverableException = new InvalidTopicException(cluster.invalidTopics()); } } private void checkUnauthorizedTopics(Cluster cluster) { if (!cluster.unauthorizedTopics().isEmpty()) { log.error("Topic authorization failed for topics {}", cluster.unauthorizedTopics()); - metadataException = new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics())); + // We may be able to recover from this exception if metadata for this topic is no longer needed + recoverableException = new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics())); } } @@ -368,10 +357,6 @@ public class Metadata implements Closeable { } public synchronized void maybeThrowException() { - AuthenticationException authenticationException = getAndClearAuthenticationException(); - if (authenticationException != null) - throw authenticationException; - KafkaException metadataException = getAndClearMetadataException(); if (metadataException != null) throw metadataException; @@ -381,9 +366,9 @@ public class Metadata implements Closeable { * Record an attempt to update the metadata that failed. We need to keep track of this * to avoid retrying immediately. */ - public synchronized void failedUpdate(long now, AuthenticationException authenticationException) { + public synchronized void failedUpdate(long now, KafkaException fatalException) { this.lastRefreshMs = now; - this.authenticationException = authenticationException; + this.fatalException = fatalException; } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java index de765db..e2261d5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java @@ -16,8 +16,8 @@ */ package org.apache.kafka.clients; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; -import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.RequestHeader; @@ -64,11 +64,11 @@ public interface MetadataUpdater extends Closeable { void handleDisconnection(String destination); /** - * Handle authentication failure. Propagate the authentication exception if awaiting metadata. + * Handle failure. Propagate the exception if awaiting metadata. * - * @param exception authentication exception from broker + * @param fatalException exception corresponding to the failure */ - void handleAuthenticationFailure(AuthenticationException exception); + void handleFatalException(KafkaException fatalException); /** * Handle responses for metadata requests. diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 4f69256..48693f3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthenticationException; @@ -433,8 +434,8 @@ public class NetworkClient implements KafkaClient { doSend(request, false, now); } - private void sendInternalMetadataRequest(MetadataRequest.Builder builder, - String nodeConnectionId, long now) { + // package-private for testing + void sendInternalMetadataRequest(MetadataRequest.Builder builder, String nodeConnectionId, long now) { ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true); doSend(clientRequest, true, now); } @@ -480,6 +481,9 @@ public class NetworkClient implements KafkaClient { clientRequest.callback(), clientRequest.destination(), now, now, false, unsupportedVersionException, null, null); abortedSends.add(clientResponse); + + if (isInternalRequest && clientRequest.apiKey() == ApiKeys.METADATA) + metadataUpdater.handleFatalException(unsupportedVersionException); } } @@ -715,7 +719,7 @@ public class NetworkClient implements KafkaClient { case AUTHENTICATION_FAILED: AuthenticationException exception = disconnectState.exception(); connectionStates.authenticationFailed(nodeId, now, exception); - metadataUpdater.handleAuthenticationFailure(exception); + metadataUpdater.handleFatalException(exception); log.error("Connection to node {} ({}) failed authentication due to: {}", nodeId, disconnectState.remoteAddress(), exception.getMessage()); break; @@ -1005,9 +1009,9 @@ public class NetworkClient implements KafkaClient { } @Override - public void handleAuthenticationFailure(AuthenticationException exception) { + public void handleFatalException(KafkaException fatalException) { if (metadata.updateRequested()) - metadata.failedUpdate(time.milliseconds(), exception); + metadata.failedUpdate(time.milliseconds(), fatalException); inProgressRequestVersion = null; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java index 3d9e5ca..b7080ac 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java @@ -19,6 +19,7 @@ package org.apache.kafka.clients.admin.internals; import org.apache.kafka.clients.MetadataUpdater; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.requests.MetadataResponse; @@ -104,7 +105,7 @@ public class AdminMetadataManager { } @Override - public void handleAuthenticationFailure(AuthenticationException e) { + public void handleFatalException(KafkaException e) { updateFailed(e); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index e285ade..c9b5004 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -261,6 +261,14 @@ public class ConsumerConfig extends AbstractConfig { public static final String DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT); + /** <code>allow.auto.create.topics</code> */ + public static final String ALLOW_AUTO_CREATE_TOPICS_CONFIG = "allow.auto.create.topics"; + private static final String ALLOW_AUTO_CREATE_TOPICS_DOC = "Allow automatic topic creation on the broker when" + + " subscribing to or assigning a topic. A topic being subscribed to will be automatically created only if the" + + " broker allows for it using `auto.create.topics.enable` broker configuration. This configuration must" + + " be set to `false` when using brokers older than 0.11.0"; + public static final boolean DEFAULT_ALLOW_AUTO_CREATE_TOPICS = true; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, @@ -464,6 +472,11 @@ public class ConsumerConfig extends AbstractConfig { in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)), Importance.MEDIUM, ISOLATION_LEVEL_DOC) + .define(ALLOW_AUTO_CREATE_TOPICS_CONFIG, + Type.BOOLEAN, + DEFAULT_ALLOW_AUTO_CREATE_TOPICS, + Importance.MEDIUM, + ALLOW_AUTO_CREATE_TOPICS_DOC) // security support .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, @@ -472,7 +485,6 @@ public class ConsumerConfig extends AbstractConfig { CommonClientConfigs.SECURITY_PROTOCOL_DOC) .withClientSslSupport() .withClientSaslSupport(); - } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 5c8c1dc..3bfd5ac 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -727,6 +727,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { this.metadata = new ConsumerMetadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), !config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG), + config.getBoolean(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), subscriptions, logContext, clusterResourceListeners); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses( config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG)); @@ -1830,7 +1831,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { Timer timer = time.timer(timeout); Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata( - new MetadataRequest.Builder(Collections.singletonList(topic), true), timer); + new MetadataRequest.Builder(Collections.singletonList(topic), metadata.allowAutoTopicCreation()), timer); return topicMetadata.get(topic); } finally { release(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java index c87849d..fbdf1c6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java @@ -28,21 +28,28 @@ import java.util.Set; public class ConsumerMetadata extends Metadata { private final boolean includeInternalTopics; + private final boolean allowAutoTopicCreation; private final SubscriptionState subscription; private final Set<String> transientTopics; public ConsumerMetadata(long refreshBackoffMs, long metadataExpireMs, boolean includeInternalTopics, + boolean allowAutoTopicCreation, SubscriptionState subscription, LogContext logContext, ClusterResourceListeners clusterResourceListeners) { super(refreshBackoffMs, metadataExpireMs, logContext, clusterResourceListeners); this.includeInternalTopics = includeInternalTopics; + this.allowAutoTopicCreation = allowAutoTopicCreation; this.subscription = subscription; this.transientTopics = new HashSet<>(); } + public boolean allowAutoTopicCreation() { + return allowAutoTopicCreation; + } + @Override public synchronized MetadataRequest.Builder newMetadataRequestBuilder() { if (subscription.hasPatternSubscription()) @@ -50,7 +57,7 @@ public class ConsumerMetadata extends Metadata { List<String> topics = new ArrayList<>(); topics.addAll(subscription.groupSubscription()); topics.addAll(transientTopics); - return new MetadataRequest.Builder(topics, true); + return new MetadataRequest.Builder(topics, allowAutoTopicCreation); } synchronized void addTransientTopics(Set<String> topics) { @@ -73,5 +80,4 @@ public class ConsumerMetadata extends Metadata { return subscription.matchesSubscribedPattern(topic); } - } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java index 90e7970..295036b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java @@ -18,7 +18,6 @@ package org.apache.kafka.clients.producer.internals; import org.apache.kafka.clients.Metadata; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; @@ -111,9 +110,9 @@ public class ProducerMetadata extends Metadata { } @Override - public synchronized void failedUpdate(long now, AuthenticationException authenticationException) { - super.failedUpdate(now, authenticationException); - if (authenticationException != null) + public synchronized void failedUpdate(long now, KafkaException fatalException) { + super.failedUpdate(now, fatalException); + if (fatalException != null) notifyAll(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 4908eb3..a31c38a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -16,8 +16,10 @@ */ package org.apache.kafka.clients; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.CommonFields; @@ -58,25 +60,26 @@ public class NetworkClientTest { protected final long reconnectBackoffMsTest = 10 * 1000; protected final long reconnectBackoffMaxMsTest = 10 * 10000; + private final TestMetadataUpdater metadataUpdater = new TestMetadataUpdater(Collections.singletonList(node)); private final NetworkClient client = createNetworkClient(reconnectBackoffMaxMsTest); private final NetworkClient clientWithNoExponentialBackoff = createNetworkClient(reconnectBackoffMsTest); private final NetworkClient clientWithStaticNodes = createNetworkClientWithStaticNodes(); private final NetworkClient clientWithNoVersionDiscovery = createNetworkClientWithNoVersionDiscovery(); private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) { - return new NetworkClient(selector, new ManualMetadataUpdater(Collections.singletonList(node)), "mock", Integer.MAX_VALUE, + return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, ClientDnsLookup.DEFAULT, time, true, new ApiVersions(), new LogContext()); } private NetworkClient createNetworkClientWithStaticNodes() { - return new NetworkClient(selector, new ManualMetadataUpdater(Collections.singletonList(node)), + return new NetworkClient(selector, metadataUpdater, "mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, ClientDnsLookup.DEFAULT, time, true, new ApiVersions(), new LogContext()); } private NetworkClient createNetworkClientWithNoVersionDiscovery() { - return new NetworkClient(selector, new ManualMetadataUpdater(Collections.singletonList(node)), "mock", Integer.MAX_VALUE, + return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 1024, defaultRequestTimeoutMs, ClientDnsLookup.DEFAULT, time, false, new ApiVersions(), new LogContext()); @@ -140,6 +143,16 @@ public class NetworkClientTest { assertFalse("Connection should not be ready after close", client.isReady(node, 0)); } + @Test + public void testUnsupportedVersionDuringInternalMetadataRequest() { + List<String> topics = Arrays.asList("topic_1"); + + // disabling auto topic creation for versions less than 4 is not supported + MetadataRequest.Builder builder = new MetadataRequest.Builder(topics, false, (short) 3); + client.sendInternalMetadataRequest(builder, node.idString(), time.milliseconds()); + assertEquals(UnsupportedVersionException.class, metadataUpdater.getAndClearFailure().getClass()); + } + private void checkSimpleRequestResponse(NetworkClient networkClient) { awaitReady(networkClient, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0 ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000, @@ -583,4 +596,25 @@ public class NetworkClientTest { this.response = response; } } + + // ManualMetadataUpdater with ability to keep track of failures + private static class TestMetadataUpdater extends ManualMetadataUpdater { + KafkaException failure; + + public TestMetadataUpdater(List<Node> nodes) { + super(nodes); + } + + @Override + public void handleFatalException(KafkaException exception) { + failure = exception; + super.handleFatalException(exception); + } + + public KafkaException getAndClearFailure() { + KafkaException failure = this.failure; + this.failure = null; + return failure; + } + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 524ee25..3a4076b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1656,8 +1656,8 @@ public class KafkaConsumerTest { } private ConsumerMetadata createMetadata(SubscriptionState subscription) { - return new ConsumerMetadata(0, Long.MAX_VALUE, false, subscription, - new LogContext(), new ClusterResourceListeners()); + return new ConsumerMetadata(0, Long.MAX_VALUE, false, false, + subscription, new LogContext(), new ClusterResourceListeners()); } private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 93c074b..4ce0386 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -96,7 +96,7 @@ public class AbstractCoordinatorTest { LogContext logContext = new LogContext(); this.mockTime = new MockTime(); ConsumerMetadata metadata = new ConsumerMetadata(retryBackoffMs, 60 * 60 * 1000L, - false, new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST), + false, false, new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST), logContext, new ClusterResourceListeners()); this.mockClient = new MockClient(mockTime, metadata); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index a83df5e..4f6e0f1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -139,7 +139,7 @@ public class ConsumerCoordinatorTest { LogContext logContext = new LogContext(); this.subscriptions = new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST); this.metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false, - subscriptions, logContext, new ClusterResourceListeners()); + false, subscriptions, logContext, new ClusterResourceListeners()); this.client = new MockClient(time, metadata); this.client.updateMetadata(metadataResponse); this.consumerClient = new ConsumerNetworkClient(logContext, client, metadata, time, 100, @@ -1181,7 +1181,7 @@ public class ConsumerCoordinatorTest { private void testInternalTopicInclusion(boolean includeInternalTopics) { metadata = new ConsumerMetadata(0, Long.MAX_VALUE, includeInternalTopics, - subscriptions, new LogContext(), new ClusterResourceListeners()); + false, subscriptions, new LogContext(), new ClusterResourceListeners()); client = new MockClient(time, metadata); coordinator = buildCoordinator(new Metrics(), assignors, false, Optional.empty()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java index d97887a..86740f5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java @@ -159,8 +159,8 @@ public class ConsumerMetadataTest { private ConsumerMetadata newConsumerMetadata(boolean includeInternalTopics) { long refreshBackoffMs = 50; long expireMs = 50000; - return new ConsumerMetadata(refreshBackoffMs, expireMs, includeInternalTopics, subscription, new LogContext(), - new ClusterResourceListeners()); + return new ConsumerMetadata(refreshBackoffMs, expireMs, includeInternalTopics, false, + subscription, new LogContext(), new ClusterResourceListeners()); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java index 14c2cba..1b7f8fb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.DisconnectException; @@ -238,7 +239,7 @@ public class ConsumerNetworkClientTest { fail("Expected authentication error thrown"); } catch (AuthenticationException e) { // After the exception is raised, it should have been cleared - assertNull(metadata.getAndClearAuthenticationException()); + assertNull(metadata.getAndClearMetadataException()); } } @@ -259,6 +260,18 @@ public class ConsumerNetworkClientTest { } @Test + public void testMetadataFailurePropagated() { + KafkaException metadataException = new KafkaException(); + metadata.failedUpdate(time.milliseconds(), metadataException); + try { + consumerClient.poll(time.timer(Duration.ZERO)); + fail("Expected poll to throw exception"); + } catch (Exception e) { + assertEquals(metadataException, e); + } + } + + @Test public void testFutureCompletionOutsidePoll() throws Exception { // Tests the scenario in which the request that is being awaited in one thread // is received and completed in another thread. diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 6a0a4f3..30c9a04 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -3395,7 +3395,7 @@ public class FetcherTest { LogContext logContext = new LogContext(); time = new MockTime(1); subscriptions = new SubscriptionState(logContext, offsetResetStrategy); - metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false, + metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false, false, subscriptions, logContext, new ClusterResourceListeners()); client = new MockClient(time, metadata); metrics = new Metrics(metricConfig, time); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java index ee00e48..55b8754 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetForLeaderEpochClientTest.java @@ -157,7 +157,7 @@ public class OffsetForLeaderEpochClientTest { LogContext logContext = new LogContext(); time = new MockTime(1); subscriptions = new SubscriptionState(logContext, offsetResetStrategy); - metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false, + metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false, false, subscriptions, logContext, new ClusterResourceListeners()); client = new MockClient(time, metadata); consumerClient = new ConsumerNetworkClient(logContext, client, metadata, time, diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala new file mode 100644 index 0000000..11fbefd --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.kafka.api + +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.runners.Parameterized.Parameters +import java.lang.{Boolean => JBoolean} +import java.time.Duration +import java.util + +import scala.collection.JavaConverters._ +import kafka.api.IntegrationTestHarness +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.utils.Utils +import org.junit.{After, Test} + +/** + * Tests behavior of specifying auto topic creation configuration for the consumer and broker + */ +@RunWith(value = classOf[Parameterized]) +class ConsumerTopicCreationTest(brokerAutoTopicCreationEnable: JBoolean, consumerAllowAutoCreateTopics: JBoolean) extends IntegrationTestHarness { + override protected def brokerCount: Int = 1 + + val topic = "topic" + val part = 0 + val tp = new TopicPartition(topic, part) + val producerClientId = "ConsumerTestProducer" + val consumerClientId = "ConsumerTestConsumer" + var adminClient: AdminClient = null + + // configure server properties + this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown + this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, brokerAutoTopicCreationEnable.toString) + + // configure client properties + this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId) + this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId) + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") + this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100") + this.consumerConfig.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, consumerAllowAutoCreateTopics.toString) + + @After + override def tearDown(): Unit = { + if (adminClient != null) + Utils.closeQuietly(adminClient, "AdminClient") + super.tearDown() + } + + @Test + def testAutoTopicCreation(): Unit = { + val consumer = createConsumer() + adminClient = AdminClient.create(createConfig()) + + consumer.subscribe(util.Arrays.asList(topic)) + consumer.poll(Duration.ofMillis(100)) + + val topicCreated = adminClient.listTopics.names.get.contains(topic) + if (brokerAutoTopicCreationEnable && consumerAllowAutoCreateTopics) + assert(topicCreated == true) + else + assert(topicCreated == false) + } + + def createConfig(): util.Map[String, Object] = { + val config = new util.HashMap[String, Object] + config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000") + val securityProps: util.Map[Object, Object] = + TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties) + securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) } + config + } +} + +object ConsumerTopicCreationTest { + @Parameters(name = "brokerTopicCreation={0}, consumerTopicCreation={1}") + def parameters: java.util.Collection[Array[Object]] = { + val data = new java.util.ArrayList[Array[Object]]() + for (brokerAutoTopicCreationEnable <- Array(JBoolean.TRUE, JBoolean.FALSE)) + for (consumerAutoCreateTopicsPolicy <- Array(JBoolean.TRUE, JBoolean.FALSE)) + data.add(Array(brokerAutoTopicCreationEnable, consumerAutoCreateTopicsPolicy)) + data + } +}