This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6a5992a KAFKA-8531: Change default replication factor config (#10532)
6a5992a is described below
commit 6a5992a814364a41ce7ad34aa6b27cc44d0eb78c
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed May 5 16:16:06 2021 -0700
KAFKA-8531: Change default replication factor config (#10532)
Implements KIP-733
Reviewers: A. Sophie Blee-Goldman <[email protected]>
---
docs/streams/developer-guide/config-streams.html | 6 +--
docs/streams/upgrade-guide.html | 7 ++-
.../org/apache/kafka/streams/StreamsConfig.java | 14 +++---
.../processor/internals/InternalTopicManager.java | 30 ++++++++++--
.../internals/InternalTopicManagerTest.java | 57 ++++++++++++++++++++++
5 files changed, 99 insertions(+), 15 deletions(-)
diff --git a/docs/streams/developer-guide/config-streams.html
b/docs/streams/developer-guide/config-streams.html
index 1f5002f..1f8e8f3 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -304,10 +304,10 @@
<td>100 milliseconds</td>
</tr>
<tr class="row-even"><td>replication.factor</td>
- <td>High</td>
+ <td>Medium</td>
<td colspan="2">The replication factor for changelog topics and
repartition topics created by the application.
- If your broker cluster is on version 2.4 or newer, you can set
-1 to use the broker default replication factor.</td>
- <td>1</td>
+ The default of <code>-1</code> (meaning: use broker default
replication factor) requires broker version 2.4 or newer.</td>
+ <td>-1</td>
</tr>
<tr class="row-even"><td>retry.backoff.ms</td>
<td>Medium</td>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 1990d72..6f13cc5 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -144,7 +144,12 @@
<li>Connect-json: As of Kafka Streams no longer has a compile time
dependency on "connect:json" module (<a
href="https://issues.apache.org/jira/browse/KAFKA-5146">KAFKA-5146</a>).
Projects that were relying on this transitive dependency will have
to explicitly declare it.</li>
</ul>
-
+ <p>
+ The default value for configuration parameter
<code>replication.factor</code> was changed to <code>-1</code>
+ (meaning: use broker default replication factor).
+ The <code>replication.factor</code> value of <code>-1</code> requires
broker version 2.4 or newer.
+ </p>
+
<h3><a id="streams_api_changes_280"
href="#streams_api_changes_280">Streams API changes in 2.8.0</a></h3>
<p>
We extended <code>StreamJoined</code> to include the options
<code>withLoggingEnabled()</code> and <code>withLoggingDisabled()</code> in
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 89154c7..0e9e6a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -501,8 +501,8 @@ public class StreamsConfig extends AbstractConfig {
/** {@code replication.factor} */
@SuppressWarnings("WeakerAccess")
public static final String REPLICATION_FACTOR_CONFIG =
"replication.factor";
- private static final String REPLICATION_FACTOR_DOC = "The replication
factor for change log topics and repartition topics created by the stream
processing application." +
- " If your broker cluster is on version 2.4 or newer, you can set -1 to
use the broker default replication factor.";
+ private static final String REPLICATION_FACTOR_DOC = "The replication
factor for change log topics and repartition topics created by the stream
processing application."
+ + " The default of <code>-1</code> (meaning: use broker default
replication factor) requires broker version 2.4 or newer";
/** {@code request.timeout.ms} */
@SuppressWarnings("WeakerAccess")
@@ -608,11 +608,6 @@ public class StreamsConfig extends AbstractConfig {
Type.LIST,
Importance.HIGH,
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
- .define(REPLICATION_FACTOR_CONFIG,
- Type.INT,
- 1,
- Importance.HIGH,
- REPLICATION_FACTOR_DOC)
.define(STATE_DIR_CONFIG,
Type.STRING,
System.getProperty("java.io.tmpdir") + File.separator +
"kafka-streams",
@@ -700,6 +695,11 @@ public class StreamsConfig extends AbstractConfig {
in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA,
EXACTLY_ONCE_V2),
Importance.MEDIUM,
PROCESSING_GUARANTEE_DOC)
+ .define(REPLICATION_FACTOR_CONFIG,
+ Type.INT,
+ -1,
+ Importance.MEDIUM,
+ REPLICATION_FACTOR_DOC)
.define(SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 2ca130a..c612245 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -35,6 +35,7 @@ import
org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@@ -439,13 +440,34 @@ public class InternalTopicManager {
final Throwable cause = executionException.getCause();
if (cause instanceof TopicExistsException) {
// This topic didn't exist earlier or its leader
not known before; just retain it for next round of validation.
- log.info("Could not create topic {}. Topic is
probably marked for deletion (number of partitions is unknown).\n" +
- "Will retry to create this topic in {} ms (to
let broker finish async delete operation first).\n" +
- "Error message was: {}", topicName,
retryBackOffMs, cause.toString());
+ log.info(
+ "Could not create topic {}. Topic is probably
marked for deletion (number of partitions is unknown).\n"
+ +
+ "Will retry to create this topic in {} ms
(to let broker finish async delete operation first).\n"
+ +
+ "Error message was: {}", topicName,
retryBackOffMs,
+ cause.toString());
} else {
log.error("Unexpected error during topic creation
for {}.\n" +
"Error message was: {}", topicName,
cause.toString());
- throw new StreamsException(String.format("Could
not create topic %s.", topicName), cause);
+
+ if (cause instanceof UnsupportedVersionException) {
+ final String errorMessage = cause.getMessage();
+ if (errorMessage != null &&
+ errorMessage.startsWith("Creating topics
with default partitions/replication factor are only supported in
CreateTopicRequest version 4+")) {
+
+ throw new StreamsException(String.format(
+ "Could not create topic %s, because
brokers don't support configuration replication.factor=-1."
+ + " You can change the
replication.factor config or upgrade your brokers to version 2.4 or newer to
avoid this error.",
+ topicName)
+ );
+ }
+ } else {
+ throw new StreamsException(
+ String.format("Could not create topic
%s.", topicName),
+ cause
+ );
+ }
}
} catch (final TimeoutException retriableException) {
log.error("Creating topic {} timed out.\n" +
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 653250c..5b2b64e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import
org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
@@ -40,7 +41,13 @@ import
org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignmentCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
@@ -53,6 +60,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -223,6 +231,55 @@ public class InternalTopicManagerTest {
}
@Test
+ public void shouldThrowInformativeExceptionForOlderBrokers() {
+ final AdminClient admin = new MockAdminClient() {
+ @Override
+ public CreateTopicsResult createTopics(final Collection<NewTopic>
newTopics,
+ final CreateTopicsOptions
options) {
+ final CreatableTopic topicToBeCreated = new CreatableTopic();
+ topicToBeCreated.setAssignments(new
CreatableReplicaAssignmentCollection());
+ topicToBeCreated.setNumPartitions((short) 1);
+ // set unsupported replication factor for older brokers
+ topicToBeCreated.setReplicationFactor((short) -1);
+
+ final CreatableTopicCollection topicsToBeCreated = new
CreatableTopicCollection();
+ topicsToBeCreated.add(topicToBeCreated);
+
+ try {
+ new CreateTopicsRequest.Builder(
+ new CreateTopicsRequestData()
+ .setTopics(topicsToBeCreated)
+ .setTimeoutMs(0)
+ .setValidateOnly(options.shouldValidateOnly()))
+ .build((short) 3); // pass in old unsupported request
version for old brokers
+
+ throw new IllegalStateException("Building
CreateTopicRequest should have thrown.");
+ } catch (final UnsupportedVersionException expected) {
+ final KafkaFutureImpl<TopicMetadataAndConfig> future = new
KafkaFutureImpl<>();
+ future.completeExceptionally(expected);
+
+ return new
CreateTopicsResult(Collections.singletonMap(topic1, future)) { };
+ }
+ }
+ };
+
+ final StreamsConfig streamsConfig = new StreamsConfig(config);
+ final InternalTopicManager topicManager = new
InternalTopicManager(Time.SYSTEM, admin, streamsConfig);
+
+ final InternalTopicConfig topicConfig = new
RepartitionTopicConfig(topic1, Collections.emptyMap());
+ topicConfig.setNumberOfPartitions(1);
+
+ final StreamsException exception = assertThrows(
+ StreamsException.class,
+ () -> topicManager.makeReady(Collections.singletonMap(topic1,
topicConfig))
+ );
+ assertThat(
+ exception.getMessage(),
+ equalTo("Could not create topic " + topic1 + ", because brokers
don't support configuration replication.factor=-1."
+ + " You can change the replication.factor config or upgrade
your brokers to version 2.4 or newer to avoid this error."));
+ }
+
+ @Test
public void shouldThrowTimeoutExceptionIfTopicExistsDuringSetup() {
setupTopicInMockAdminClient(topic1, Collections.emptyMap());
final InternalTopicConfig internalTopicConfig =
setupRepartitionTopicConfig(topic1, 1);