This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a05e336  KAFKA-6677: Fixed StreamsConfig producer's max-in-flight 
allowed when EOS enabled. (#4868)
a05e336 is described below

commit a05e33693b66ac38ccb21f2238c194ca59fcb6ec
Author: Jagadesh Adireddi <[email protected]>
AuthorDate: Tue Apr 24 23:43:12 2018 +0530

    KAFKA-6677: Fixed StreamsConfig producer's max-in-flight allowed when EOS 
enabled. (#4868)
    
    Reviewers: Matthias J Sax <matthias@confluentio>, Bill Bejeck 
<[email protected]>
---
 .../org/apache/kafka/streams/StreamsConfig.java    |  8 ++++--
 .../apache/kafka/streams/StreamsConfigTest.java    | 30 +++++++++-------------
 2 files changed, 18 insertions(+), 20 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 65b1da6..e46d6d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
@@ -119,7 +120,7 @@ import static 
org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
  * <ul>
  *   <li>{@link ConsumerConfig#ISOLATION_LEVEL_CONFIG "isolation.level"} 
(read_committed) - Consumers will always read committed data only</li>
  *   <li>{@link ProducerConfig#ENABLE_IDEMPOTENCE_CONFIG "enable.idempotence"} 
(true) - Producer will always have idempotency enabled</li>
- *   <li>{@link ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 
"max.in.flight.requests.per.connection"} (1) - Producer will always have one 
in-flight request per connection</li>
+ *   <li>{@link ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 
"max.in.flight.requests.per.connection"} (5) - Producer will always have one 
in-flight request per connection</li>
  * </ul>
  *
  *
@@ -650,7 +651,6 @@ public class StreamsConfig extends AbstractConfig {
         final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
         tempProducerDefaultOverrides.put(ProducerConfig.RETRIES_CONFIG, 
Integer.MAX_VALUE);
         
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
-        
tempProducerDefaultOverrides.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
 1);
 
         PRODUCER_EOS_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
@@ -785,6 +785,10 @@ public class StreamsConfig extends AbstractConfig {
         // consumer/producer configurations, log a warning and remove the user 
defined value from the Map.
         // Thus the default values for these consumer/producer configurations 
that are suitable for
         // Streams will be used instead.
+        final Object maxInflightRequests = 
clientProvidedProps.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
+        if (eosEnabled && maxInflightRequests != null && 5 < (int) 
maxInflightRequests) {
+            throw new 
ConfigException(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " can't 
exceed 5 when using the idempotent producer");
+        }
         for (final String config: nonConfigurableConfigs) {
             if (clientProvidedProps.containsKey(config)) {
                 final String eosMessage =  PROCESSING_GUARANTEE_CONFIG + " is 
set to " + EXACTLY_ONCE + ". Hence, ";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 87f7075..ef5e5a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -419,23 +419,6 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void 
shouldResetToDefaultIfProducerMaxInFlightRequestPerConnectionsIsOverriddenIfEosEnabled()
 {
-        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
-        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 
"anyValue");
-        final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> producerConfigs = 
streamsConfig.getProducerConfigs("clientId");
-        assertThat((Integer) 
producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 
equalTo(1));
-    }
-
-    @Test
-    public void 
shouldAllowSettingProducerMaxInFlightRequestPerConnectionsWhenEosDisabled() {
-        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 2);
-        final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> producerConfigs = 
streamsConfig.getProducerConfigs("clientId");
-        assertThat((Integer) 
producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 
equalTo(2));
-    }
-
-    @Test
     public void shouldSetDifferentDefaultsIfEosEnabled() {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -446,7 +429,6 @@ public class StreamsConfigTest {
         assertThat((String) 
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), 
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
         assertTrue((Boolean) 
producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
         assertThat((Integer) 
producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(Integer.MAX_VALUE));
-        assertThat((Integer) 
producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 
equalTo(1));
         
assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), 
equalTo(100L));
     }
 
@@ -563,6 +545,18 @@ public class StreamsConfigTest {
         }
     }
 
+    @Test
+    public void 
shouldThrowExceptionIfMaxInflightRequestsGreatherThanFiveIfEosEnabled() {
+        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7);
+        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        try {
+            streamsConfig.getProducerConfigs("clientId");
+            fail("Should throw ConfigException when Eos is enabled and 
maxInFlight requests exceeds 5");
+        } catch (final ConfigException e) {
+            assertEquals("max.in.flight.requests.per.connection can't exceed 5 
when using the idempotent producer", e.getMessage());
+        }
+    }
 
 
     static class MisconfiguredSerde implements Serde {

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to