[
https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16450337#comment-16450337
]
ASF GitHub Bot commented on KAFKA-6677:
---------------------------------------
mjsax closed pull request #4868: KAFKA-6677: Fixed streamconfig producer's
maxinflight allowed when EOS Enabled.
URL: https://github.com/apache/kafka/pull/4868
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 65b1da6dede..e46d6d086ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -27,6 +27,7 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
@@ -119,7 +120,7 @@
* <ul>
* <li>{@link ConsumerConfig#ISOLATION_LEVEL_CONFIG "isolation.level"}
(read_committed) - Consumers will always read committed data only</li>
* <li>{@link ProducerConfig#ENABLE_IDEMPOTENCE_CONFIG "enable.idempotence"}
(true) - Producer will always have idempotency enabled</li>
- * <li>{@link ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
"max.in.flight.requests.per.connection"} (1) - Producer will always have one
in-flight request per connection</li>
+ * <li>{@link ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
"max.in.flight.requests.per.connection"} (5) - Producer will always have one
in-flight request per connection</li>
* </ul>
*
*
@@ -650,7 +651,6 @@
final Map<String, Object> tempProducerDefaultOverrides = new
HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
tempProducerDefaultOverrides.put(ProducerConfig.RETRIES_CONFIG,
Integer.MAX_VALUE);
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
true);
-
tempProducerDefaultOverrides.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
1);
PRODUCER_EOS_OVERRIDES =
Collections.unmodifiableMap(tempProducerDefaultOverrides);
}
@@ -785,6 +785,10 @@ private void
checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
// consumer/producer configurations, log a warning and remove the user
defined value from the Map.
// Thus the default values for these consumer/producer configurations
that are suitable for
// Streams will be used instead.
+ final Object maxInflightRequests =
clientProvidedProps.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
+ if (eosEnabled && maxInflightRequests != null && 5 < (int)
maxInflightRequests) {
+ throw new
ConfigException(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " can't
exceed 5 when using the idempotent producer");
+ }
for (final String config: nonConfigurableConfigs) {
if (clientProvidedProps.containsKey(config)) {
final String eosMessage = PROCESSING_GUARANTEE_CONFIG + " is
set to " + EXACTLY_ONCE + ". Hence, ";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 87f70759220..ef5e5a86be7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -418,23 +418,6 @@ public void
shouldAllowSettingProducerEnableIdempotenceIfEosDisabled() {
assertThat((Boolean)
producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), equalTo(false));
}
- @Test
- public void
shouldResetToDefaultIfProducerMaxInFlightRequestPerConnectionsIsOverriddenIfEosEnabled()
{
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
- props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
"anyValue");
- final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map<String, Object> producerConfigs =
streamsConfig.getProducerConfigs("clientId");
- assertThat((Integer)
producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
equalTo(1));
- }
-
- @Test
- public void
shouldAllowSettingProducerMaxInFlightRequestPerConnectionsWhenEosDisabled() {
- props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 2);
- final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map<String, Object> producerConfigs =
streamsConfig.getProducerConfigs("clientId");
- assertThat((Integer)
producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
equalTo(2));
- }
-
@Test
public void shouldSetDifferentDefaultsIfEosEnabled() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
@@ -446,7 +429,6 @@ public void shouldSetDifferentDefaultsIfEosEnabled() {
assertThat((String)
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
assertTrue((Boolean)
producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
assertThat((Integer)
producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(Integer.MAX_VALUE));
- assertThat((Integer)
producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
equalTo(1));
assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG),
equalTo(100L));
}
@@ -563,6 +545,18 @@ public void shouldSpecifyCorrectValueSerdeClassOnError() {
}
}
+ @Test
+ public void
shouldThrowExceptionIfMaxInflightRequestsGreatherThanFiveIfEosEnabled() {
+ props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7);
+ props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ try {
+ streamsConfig.getProducerConfigs("clientId");
+ fail("Should throw ConfigException when Eos is enabled and
maxInFlight requests exceeds 5");
+ } catch (final ConfigException e) {
+ assertEquals("max.in.flight.requests.per.connection can't exceed 5
when using the idempotent producer", e.getMessage());
+ }
+ }
static class MisconfiguredSerde implements Serde {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Remove EOS producer config max.in.flight.request.per.connection=1
> -----------------------------------------------------------------
>
> Key: KAFKA-6677
> URL: https://issues.apache.org/jira/browse/KAFKA-6677
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Matthias J. Sax
> Assignee: Jagadesh Adireddi
> Priority: Major
> Fix For: 1.2.0
>
>
> When EOS was introduced in 0.11, it was required to set producer config
> max.in.flight.requests.per.connection=1 for idempotent producer.
> This limitations as fixed in 1.0 release via KAFKA-5494
> Thus, we should remove this setting in Kafka Streams if EOS get's enabled.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)