This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6a5992a  KAFKA-8531: Change default replication factor config (#10532)
6a5992a is described below

commit 6a5992a814364a41ce7ad34aa6b27cc44d0eb78c
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed May 5 16:16:06 2021 -0700

    KAFKA-8531: Change default replication factor config (#10532)
    
    Implements KIP-733
    
    Reviewers: A. Sophie Blee-Goldman <[email protected]>
---
 docs/streams/developer-guide/config-streams.html   |  6 +--
 docs/streams/upgrade-guide.html                    |  7 ++-
 .../org/apache/kafka/streams/StreamsConfig.java    | 14 +++---
 .../processor/internals/InternalTopicManager.java  | 30 ++++++++++--
 .../internals/InternalTopicManagerTest.java        | 57 ++++++++++++++++++++++
 5 files changed, 99 insertions(+), 15 deletions(-)

diff --git a/docs/streams/developer-guide/config-streams.html 
b/docs/streams/developer-guide/config-streams.html
index 1f5002f..1f8e8f3 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -304,10 +304,10 @@
             <td>100 milliseconds</td>
           </tr>
           <tr class="row-even"><td>replication.factor</td>
-            <td>High</td>
+            <td>Medium</td>
             <td colspan="2">The replication factor for changelog topics and 
repartition topics created by the application.
-              If your broker cluster is on version 2.4 or newer, you can set 
-1 to use the broker default replication factor.</td>
-            <td>1</td>
+              The default of <code>-1</code> (meaning: use broker default 
replication factor) requires broker version 2.4 or newer.</td>
+            <td>-1</td>
           </tr>
           <tr class="row-even"><td>retry.backoff.ms</td>
               <td>Medium</td>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 1990d72..6f13cc5 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -144,7 +144,12 @@
         <li>Connect-json: As of Kafka Streams no longer has a compile time 
dependency on "connect:json" module (<a 
href="https://issues.apache.org/jira/browse/KAFKA-5146";>KAFKA-5146</a>).
             Projects that were relying on this transitive dependency will have 
to explicitly declare it.</li>
     </ul>
-    
+    <p>
+      The default value for configuration parameter 
<code>replication.factor</code> was changed to <code>-1</code>
+      (meaning: use broker default replication factor).
+      The <code>replication.factor</code> value of <code>-1</code> requires 
broker version 2.4 or newer.
+    </p>
+
     <h3><a id="streams_api_changes_280" 
href="#streams_api_changes_280">Streams API changes in 2.8.0</a></h3>
     <p>
         We extended <code>StreamJoined</code> to include the options 
<code>withLoggingEnabled()</code> and <code>withLoggingDisabled()</code> in
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 89154c7..0e9e6a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -501,8 +501,8 @@ public class StreamsConfig extends AbstractConfig {
     /** {@code replication.factor} */
     @SuppressWarnings("WeakerAccess")
     public static final String REPLICATION_FACTOR_CONFIG = 
"replication.factor";
-    private static final String REPLICATION_FACTOR_DOC = "The replication 
factor for change log topics and repartition topics created by the stream 
processing application." +
-        " If your broker cluster is on version 2.4 or newer, you can set -1 to 
use the broker default replication factor.";
+    private static final String REPLICATION_FACTOR_DOC = "The replication 
factor for change log topics and repartition topics created by the stream 
processing application."
+        + " The default of <code>-1</code> (meaning: use broker default 
replication factor) requires broker version 2.4 or newer";
 
     /** {@code request.timeout.ms} */
     @SuppressWarnings("WeakerAccess")
@@ -608,11 +608,6 @@ public class StreamsConfig extends AbstractConfig {
                     Type.LIST,
                     Importance.HIGH,
                     CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
-            .define(REPLICATION_FACTOR_CONFIG,
-                    Type.INT,
-                    1,
-                    Importance.HIGH,
-                    REPLICATION_FACTOR_DOC)
             .define(STATE_DIR_CONFIG,
                     Type.STRING,
                     System.getProperty("java.io.tmpdir") + File.separator + 
"kafka-streams",
@@ -700,6 +695,11 @@ public class StreamsConfig extends AbstractConfig {
                     in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
                     Importance.MEDIUM,
                     PROCESSING_GUARANTEE_DOC)
+            .define(REPLICATION_FACTOR_CONFIG,
+                    Type.INT,
+                    -1,
+                    Importance.MEDIUM,
+                    REPLICATION_FACTOR_DOC)
             .define(SECURITY_PROTOCOL_CONFIG,
                     Type.STRING,
                     CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 2ca130a..c612245 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -35,6 +35,7 @@ import 
org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
@@ -439,13 +440,34 @@ public class InternalTopicManager {
                         final Throwable cause = executionException.getCause();
                         if (cause instanceof TopicExistsException) {
                             // This topic didn't exist earlier or its leader 
not known before; just retain it for next round of validation.
-                            log.info("Could not create topic {}. Topic is 
probably marked for deletion (number of partitions is unknown).\n" +
-                                "Will retry to create this topic in {} ms (to 
let broker finish async delete operation first).\n" +
-                                "Error message was: {}", topicName, 
retryBackOffMs, cause.toString());
+                            log.info(
+                                "Could not create topic {}. Topic is probably 
marked for deletion (number of partitions is unknown).\n"
+                                    +
+                                    "Will retry to create this topic in {} ms 
(to let broker finish async delete operation first).\n"
+                                    +
+                                    "Error message was: {}", topicName, 
retryBackOffMs,
+                                cause.toString());
                         } else {
                             log.error("Unexpected error during topic creation 
for {}.\n" +
                                 "Error message was: {}", topicName, 
cause.toString());
-                            throw new StreamsException(String.format("Could 
not create topic %s.", topicName), cause);
+
+                            if (cause instanceof UnsupportedVersionException) {
+                                final String errorMessage = cause.getMessage();
+                                if (errorMessage != null &&
+                                    errorMessage.startsWith("Creating topics 
with default partitions/replication factor are only supported in 
CreateTopicRequest version 4+")) {
+
+                                    throw new StreamsException(String.format(
+                                        "Could not create topic %s, because 
brokers don't support configuration replication.factor=-1."
+                                            + " You can change the 
replication.factor config or upgrade your brokers to version 2.4 or newer to 
avoid this error.",
+                                        topicName)
+                                    );
+                                }
+                            } else {
+                                throw new StreamsException(
+                                    String.format("Could not create topic 
%s.", topicName),
+                                    cause
+                                );
+                            }
                         }
                     } catch (final TimeoutException retriableException) {
                         log.error("Creating topic {} timed out.\n" +
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 653250c..5b2b64e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.CreateTopicsOptions;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
 import 
org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig;
 import org.apache.kafka.clients.admin.DeleteTopicsResult;
@@ -40,7 +41,13 @@ import 
org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignmentCollection;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
@@ -53,6 +60,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -223,6 +231,55 @@ public class InternalTopicManagerTest {
     }
 
     @Test
+    public void shouldThrowInformativeExceptionForOlderBrokers() {
+        final AdminClient admin = new MockAdminClient() {
+            @Override
+            public CreateTopicsResult createTopics(final Collection<NewTopic> 
newTopics,
+                                                   final CreateTopicsOptions 
options) {
+                final CreatableTopic topicToBeCreated = new CreatableTopic();
+                topicToBeCreated.setAssignments(new 
CreatableReplicaAssignmentCollection());
+                topicToBeCreated.setNumPartitions((short) 1);
+                // set unsupported replication factor for older brokers
+                topicToBeCreated.setReplicationFactor((short) -1);
+
+                final CreatableTopicCollection topicsToBeCreated = new 
CreatableTopicCollection();
+                topicsToBeCreated.add(topicToBeCreated);
+
+                try {
+                    new CreateTopicsRequest.Builder(
+                        new CreateTopicsRequestData()
+                            .setTopics(topicsToBeCreated)
+                            .setTimeoutMs(0)
+                            .setValidateOnly(options.shouldValidateOnly()))
+                        .build((short) 3); // pass in old unsupported request 
version for old brokers
+
+                    throw new IllegalStateException("Building 
CreateTopicRequest should have thrown.");
+                } catch (final UnsupportedVersionException expected) {
+                    final KafkaFutureImpl<TopicMetadataAndConfig> future = new 
KafkaFutureImpl<>();
+                    future.completeExceptionally(expected);
+
+                    return new 
CreateTopicsResult(Collections.singletonMap(topic1, future)) { };
+                }
+            }
+        };
+
+        final StreamsConfig streamsConfig = new StreamsConfig(config);
+        final InternalTopicManager topicManager = new 
InternalTopicManager(Time.SYSTEM, admin, streamsConfig);
+
+        final InternalTopicConfig topicConfig = new 
RepartitionTopicConfig(topic1, Collections.emptyMap());
+        topicConfig.setNumberOfPartitions(1);
+
+        final StreamsException exception = assertThrows(
+            StreamsException.class,
+            () -> topicManager.makeReady(Collections.singletonMap(topic1, 
topicConfig))
+        );
+        assertThat(
+            exception.getMessage(),
+            equalTo("Could not create topic " + topic1 + ", because brokers 
don't support configuration replication.factor=-1."
+                + " You can change the replication.factor config or upgrade 
your brokers to version 2.4 or newer to avoid this error."));
+    }
+
+    @Test
     public void shouldThrowTimeoutExceptionIfTopicExistsDuringSetup() {
         setupTopicInMockAdminClient(topic1, Collections.emptyMap());
         final InternalTopicConfig internalTopicConfig = 
setupRepartitionTopicConfig(topic1, 1);

Reply via email to