[ 
https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16450337#comment-16450337
 ] 

ASF GitHub Bot commented on KAFKA-6677:
---------------------------------------

mjsax closed pull request #4868: KAFKA-6677: Fixed streamconfig producer's 
maxinflight allowed when EOS Enabled.
URL: https://github.com/apache/kafka/pull/4868
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 65b1da6dede..e46d6d086ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -27,6 +27,7 @@
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
@@ -119,7 +120,7 @@
  * <ul>
  *   <li>{@link ConsumerConfig#ISOLATION_LEVEL_CONFIG "isolation.level"} 
(read_committed) - Consumers will always read committed data only</li>
  *   <li>{@link ProducerConfig#ENABLE_IDEMPOTENCE_CONFIG "enable.idempotence"} 
(true) - Producer will always have idempotency enabled</li>
- *   <li>{@link ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 
"max.in.flight.requests.per.connection"} (1) - Producer will always have one 
in-flight request per connection</li>
+ *   <li>{@link ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 
"max.in.flight.requests.per.connection"} (5) - Producer will always have one 
in-flight request per connection</li>
  * </ul>
  *
  *
@@ -650,7 +651,6 @@
         final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
         tempProducerDefaultOverrides.put(ProducerConfig.RETRIES_CONFIG, 
Integer.MAX_VALUE);
         
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
-        
tempProducerDefaultOverrides.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
 1);
 
         PRODUCER_EOS_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
     }
@@ -785,6 +785,10 @@ private void 
checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
         // consumer/producer configurations, log a warning and remove the user 
defined value from the Map.
         // Thus the default values for these consumer/producer configurations 
that are suitable for
         // Streams will be used instead.
+        final Object maxInflightRequests = 
clientProvidedProps.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
+        if (eosEnabled && maxInflightRequests != null && 5 < (int) 
maxInflightRequests) {
+            throw new 
ConfigException(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " can't 
exceed 5 when using the idempotent producer");
+        }
         for (final String config: nonConfigurableConfigs) {
             if (clientProvidedProps.containsKey(config)) {
                 final String eosMessage =  PROCESSING_GUARANTEE_CONFIG + " is 
set to " + EXACTLY_ONCE + ". Hence, ";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 87f70759220..ef5e5a86be7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -418,23 +418,6 @@ public void 
shouldAllowSettingProducerEnableIdempotenceIfEosDisabled() {
         assertThat((Boolean) 
producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), equalTo(false));
     }
 
-    @Test
-    public void 
shouldResetToDefaultIfProducerMaxInFlightRequestPerConnectionsIsOverriddenIfEosEnabled()
 {
-        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
-        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 
"anyValue");
-        final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> producerConfigs = 
streamsConfig.getProducerConfigs("clientId");
-        assertThat((Integer) 
producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 
equalTo(1));
-    }
-
-    @Test
-    public void 
shouldAllowSettingProducerMaxInFlightRequestPerConnectionsWhenEosDisabled() {
-        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 2);
-        final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> producerConfigs = 
streamsConfig.getProducerConfigs("clientId");
-        assertThat((Integer) 
producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 
equalTo(2));
-    }
-
     @Test
     public void shouldSetDifferentDefaultsIfEosEnabled() {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
@@ -446,7 +429,6 @@ public void shouldSetDifferentDefaultsIfEosEnabled() {
         assertThat((String) 
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), 
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
         assertTrue((Boolean) 
producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
         assertThat((Integer) 
producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(Integer.MAX_VALUE));
-        assertThat((Integer) 
producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 
equalTo(1));
         
assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), 
equalTo(100L));
     }
 
@@ -563,6 +545,18 @@ public void shouldSpecifyCorrectValueSerdeClassOnError() {
         }
     }
 
+    @Test
+    public void 
shouldThrowExceptionIfMaxInflightRequestsGreatherThanFiveIfEosEnabled() {
+        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7);
+        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        try {
+            streamsConfig.getProducerConfigs("clientId");
+            fail("Should throw ConfigException when Eos is enabled and 
maxInFlight requests exceeds 5");
+        } catch (final ConfigException e) {
+            assertEquals("max.in.flight.requests.per.connection can't exceed 5 
when using the idempotent producer", e.getMessage());
+        }
+    }
 
 
     static class MisconfiguredSerde implements Serde {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Remove EOS producer config max.in.flight.request.per.connection=1
> -----------------------------------------------------------------
>
>                 Key: KAFKA-6677
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6677
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Jagadesh Adireddi
>            Priority: Major
>             Fix For: 1.2.0
>
>
> When EOS was introduced in 0.11, it was required to set producer config 
> max.in.flight.requests.per.connection=1 for idempotent producer.
> This limitations as fixed in 1.0 release via KAFKA-5494
> Thus, we should remove this setting in Kafka Streams if EOS get's enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to