This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a05e336 KAFKA-6677: Fixed StreamsConfig producer's max-in-flight
allowed when EOS enabled. (#4868)
a05e336 is described below
commit a05e33693b66ac38ccb21f2238c194ca59fcb6ec
Author: Jagadesh Adireddi <[email protected]>
AuthorDate: Tue Apr 24 23:43:12 2018 +0530
KAFKA-6677: Fixed StreamsConfig producer's max-in-flight allowed when EOS
enabled. (#4868)
Reviewers: Matthias J Sax <matthias@confluentio>, Bill Bejeck
<[email protected]>
---
.../org/apache/kafka/streams/StreamsConfig.java | 8 ++++--
.../apache/kafka/streams/StreamsConfigTest.java | 30 +++++++++-------------
2 files changed, 18 insertions(+), 20 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 65b1da6..e46d6d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
@@ -119,7 +120,7 @@ import static
org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
* <ul>
* <li>{@link ConsumerConfig#ISOLATION_LEVEL_CONFIG "isolation.level"}
(read_committed) - Consumers will always read committed data only</li>
* <li>{@link ProducerConfig#ENABLE_IDEMPOTENCE_CONFIG "enable.idempotence"}
(true) - Producer will always have idempotency enabled</li>
- * <li>{@link ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
"max.in.flight.requests.per.connection"} (1) - Producer will always have one
in-flight request per connection</li>
+ * <li>{@link ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
"max.in.flight.requests.per.connection"} (5) - Producer will always have one
in-flight request per connection</li>
* </ul>
*
*
@@ -650,7 +651,6 @@ public class StreamsConfig extends AbstractConfig {
final Map<String, Object> tempProducerDefaultOverrides = new
HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
tempProducerDefaultOverrides.put(ProducerConfig.RETRIES_CONFIG,
Integer.MAX_VALUE);
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
true);
-
tempProducerDefaultOverrides.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
1);
PRODUCER_EOS_OVERRIDES =
Collections.unmodifiableMap(tempProducerDefaultOverrides);
}
@@ -785,6 +785,10 @@ public class StreamsConfig extends AbstractConfig {
// consumer/producer configurations, log a warning and remove the user
defined value from the Map.
// Thus the default values for these consumer/producer configurations
that are suitable for
// Streams will be used instead.
+ final Object maxInflightRequests =
clientProvidedProps.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
+ if (eosEnabled && maxInflightRequests != null && 5 < (int)
maxInflightRequests) {
+ throw new
ConfigException(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " can't
exceed 5 when using the idempotent producer");
+ }
for (final String config: nonConfigurableConfigs) {
if (clientProvidedProps.containsKey(config)) {
final String eosMessage = PROCESSING_GUARANTEE_CONFIG + " is
set to " + EXACTLY_ONCE + ". Hence, ";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 87f7075..ef5e5a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -419,23 +419,6 @@ public class StreamsConfigTest {
}
@Test
- public void
shouldResetToDefaultIfProducerMaxInFlightRequestPerConnectionsIsOverriddenIfEosEnabled()
{
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
- props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
"anyValue");
- final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map<String, Object> producerConfigs =
streamsConfig.getProducerConfigs("clientId");
- assertThat((Integer)
producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
equalTo(1));
- }
-
- @Test
- public void
shouldAllowSettingProducerMaxInFlightRequestPerConnectionsWhenEosDisabled() {
- props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 2);
- final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map<String, Object> producerConfigs =
streamsConfig.getProducerConfigs("clientId");
- assertThat((Integer)
producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
equalTo(2));
- }
-
- @Test
public void shouldSetDifferentDefaultsIfEosEnabled() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -446,7 +429,6 @@ public class StreamsConfigTest {
assertThat((String)
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
assertTrue((Boolean)
producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
assertThat((Integer)
producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(Integer.MAX_VALUE));
- assertThat((Integer)
producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
equalTo(1));
assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG),
equalTo(100L));
}
@@ -563,6 +545,18 @@ public class StreamsConfigTest {
}
}
+ @Test
+ public void
shouldThrowExceptionIfMaxInflightRequestsGreatherThanFiveIfEosEnabled() {
+ props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7);
+ props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ try {
+ streamsConfig.getProducerConfigs("clientId");
+ fail("Should throw ConfigException when Eos is enabled and
maxInFlight requests exceeds 5");
+ } catch (final ConfigException e) {
+ assertEquals("max.in.flight.requests.per.connection can't exceed 5
when using the idempotent producer", e.getMessage());
+ }
+ }
static class MisconfiguredSerde implements Serde {
--
To stop receiving notification emails like this one, please contact
[email protected].