This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b56b848 KAFKA-8928: Logged producer config does not always match
actual configured values (#7466)
b56b848 is described below
commit b56b848682079052e98ecde9a15a8f6b8860c599
Author: huxi <[email protected]>
AuthorDate: Thu Jan 2 02:49:21 2020 +0800
KAFKA-8928: Logged producer config does not always match actual configured
values (#7466)
Some logged producer configs(clientId, acks, retries) might not be
reflected the actual values.
Reviewers: Guozhang Wang <[email protected]>
---
.../kafka/clients/producer/KafkaProducer.java | 93 ++++++----------------
.../kafka/clients/producer/ProducerConfig.java | 70 +++++++++++++++-
.../apache/kafka/common/config/AbstractConfig.java | 2 +-
.../kafka/clients/producer/KafkaProducerTest.java | 16 ++++
4 files changed, 110 insertions(+), 71 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a5d749a..66a1103 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -81,7 +81,6 @@ import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -230,7 +229,6 @@ import java.util.concurrent.atomic.AtomicReference;
public class KafkaProducer<K, V> implements Producer<K, V> {
private final Logger log;
- private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new
AtomicInteger(1);
private static final String JMX_PREFIX = "kafka.producer";
public static final String NETWORK_THREAD_PREFIX =
"kafka-producer-network-thread";
public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
@@ -333,7 +331,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
String transactionalId =
userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
(String)
userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
- this.clientId =
buildClientId(config.getString(ProducerConfig.CLIENT_ID_CONFIG),
transactionalId);
+ this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
LogContext logContext;
if (transactionalId == null)
@@ -433,19 +431,9 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
}
}
- private static String buildClientId(String configuredClientId, String
transactionalId) {
- if (!configuredClientId.isEmpty())
- return configuredClientId;
-
- if (transactionalId != null)
- return "producer-" + transactionalId;
-
- return "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
- }
-
// visible for testing
Sender newSender(LogContext logContext, KafkaClient kafkaClient,
ProducerMetadata metadata) {
- int maxInflightRequests = configureInflightRequests(producerConfig,
transactionManager != null);
+ int maxInflightRequests = configureInflightRequests(producerConfig);
int requestTimeoutMs =
producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
ChannelBuilder channelBuilder =
ClientUtils.createChannelBuilder(producerConfig, time);
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
@@ -467,8 +455,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
apiVersions,
throttleTimeSensor,
logContext);
- int retries = configureRetries(producerConfig, transactionManager !=
null, log);
- short acks = configureAcks(producerConfig, transactionManager != null,
log);
+ int retries = configureRetries(producerConfig, log);
+ short acks = configureAcks(producerConfig, log);
return new Sender(logContext,
client,
metadata,
@@ -516,23 +504,13 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
TransactionManager transactionManager = null;
- boolean userConfiguredIdempotence = false;
- if
(config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG))
- userConfiguredIdempotence = true;
-
- boolean userConfiguredTransactions = false;
- if
(config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG))
- userConfiguredTransactions = true;
+ boolean userConfiguredIdempotence =
config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
+ boolean userConfiguredTransactions =
config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+ if (userConfiguredTransactions && !userConfiguredIdempotence)
+ log.info("Overriding the default {} to true since {} is
specified.", ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
+ ProducerConfig.TRANSACTIONAL_ID_CONFIG);
- boolean idempotenceEnabled =
config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
-
- if (!idempotenceEnabled && userConfiguredIdempotence &&
userConfiguredTransactions)
- throw new ConfigException("Cannot set a " +
ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
-
- if (userConfiguredTransactions)
- idempotenceEnabled = true;
-
- if (idempotenceEnabled) {
+ if (config.idempotenceEnabled()) {
String transactionalId =
config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
int transactionTimeoutMs =
config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
long retryBackoffMs =
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
@@ -542,63 +520,40 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
else
log.info("Instantiated an idempotent producer.");
}
-
return transactionManager;
}
- private static int configureRetries(ProducerConfig config, boolean
idempotenceEnabled, Logger log) {
- boolean userConfiguredRetries = false;
- if (config.originals().containsKey(ProducerConfig.RETRIES_CONFIG)) {
- userConfiguredRetries = true;
- }
- if (idempotenceEnabled && !userConfiguredRetries) {
- // We recommend setting infinite retries when the idempotent
producer is enabled, so it makes sense to make
- // this the default.
+ private static int configureRetries(ProducerConfig config, Logger log) {
+ boolean userConfiguredRetries =
config.originals().containsKey(ProducerConfig.RETRIES_CONFIG);
+ if (config.idempotenceEnabled() && !userConfiguredRetries) {
log.info("Overriding the default retries config to the recommended
value of {} since the idempotent " +
"producer is enabled.", Integer.MAX_VALUE);
- return Integer.MAX_VALUE;
- }
- if (idempotenceEnabled && config.getInt(ProducerConfig.RETRIES_CONFIG)
== 0) {
- throw new ConfigException("Must set " +
ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent
producer.");
}
return config.getInt(ProducerConfig.RETRIES_CONFIG);
}
- private static int configureInflightRequests(ProducerConfig config,
boolean idempotenceEnabled) {
- if (idempotenceEnabled && 5 <
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
+ private static int configureInflightRequests(ProducerConfig config) {
+ if (config.idempotenceEnabled() && 5 <
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
throw new ConfigException("Must set " +
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to at most 5" +
" to use the idempotent producer.");
}
return
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
}
- private static short configureAcks(ProducerConfig config, boolean
idempotenceEnabled, Logger log) {
- boolean userConfiguredAcks = false;
- short acks = (short)
parseAcks(config.getString(ProducerConfig.ACKS_CONFIG));
- if (config.originals().containsKey(ProducerConfig.ACKS_CONFIG)) {
- userConfiguredAcks = true;
- }
-
- if (idempotenceEnabled && !userConfiguredAcks) {
- log.info("Overriding the default {} to all since idempotence is
enabled.", ProducerConfig.ACKS_CONFIG);
- return -1;
- }
+ private static short configureAcks(ProducerConfig config, Logger log) {
+ boolean userConfiguredAcks =
config.originals().containsKey(ProducerConfig.ACKS_CONFIG);
+ short acks =
Short.valueOf(config.getString(ProducerConfig.ACKS_CONFIG));
- if (idempotenceEnabled && acks != -1) {
- throw new ConfigException("Must set " + ProducerConfig.ACKS_CONFIG
+ " to all in order to use the idempotent " +
- "producer. Otherwise we cannot guarantee idempotence.");
+ if (config.idempotenceEnabled()) {
+ if (!userConfiguredAcks)
+ log.info("Overriding the default {} to all since idempotence
is enabled.", ProducerConfig.ACKS_CONFIG);
+ else if (acks != -1)
+ throw new ConfigException("Must set " +
ProducerConfig.ACKS_CONFIG + " to all in order to use the idempotent " +
+ "producer. Otherwise we cannot guarantee
idempotence.");
}
return acks;
}
- private static int parseAcks(String acksString) {
- try {
- return acksString.trim().equalsIgnoreCase("all") ? -1 :
Integer.parseInt(acksString.trim());
- } catch (NumberFormatException e) {
- throw new ConfigException("Invalid configuration value for 'acks':
" + acksString);
- }
- }
-
/**
* Needs to be called before any other methods when the transactional.id
is set in the configuration.
*
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index fc1e6a7..75af5d6 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
@@ -32,6 +33,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
@@ -249,6 +251,8 @@ public class ProducerConfig extends AbstractConfig {
public static final String SECURITY_PROVIDERS_CONFIG =
SecurityConfig.SECURITY_PROVIDERS_CONFIG;
private static final String SECURITY_PROVIDERS_DOC =
SecurityConfig.SECURITY_PROVIDERS_DOC;
+ private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new
AtomicInteger(1);
+
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST,
Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH,
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(CLIENT_DNS_LOOKUP_CONFIG,
@@ -377,7 +381,61 @@ public class ProducerConfig extends AbstractConfig {
@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String,
Object> parsedValues) {
- return CommonClientConfigs.postProcessReconnectBackoffConfigs(this,
parsedValues);
+ Map<String, Object> refinedConfigs =
CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
+ maybeOverrideEnableIdempotence(refinedConfigs);
+ maybeOverrideClientId(refinedConfigs);
+ maybeOverrideAcksAndRetries(refinedConfigs);
+ return refinedConfigs;
+ }
+
+ private void maybeOverrideClientId(final Map<String, Object> configs) {
+ String refinedClientId;
+ boolean userConfiguredClientId =
this.originals().containsKey(CLIENT_ID_CONFIG);
+ if (userConfiguredClientId) {
+ refinedClientId = this.getString(CLIENT_ID_CONFIG);
+ } else {
+ String transactionalId = this.getString(TRANSACTIONAL_ID_CONFIG);
+ refinedClientId = "producer-" + (transactionalId != null ?
transactionalId : PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement());
+ }
+ configs.put(CLIENT_ID_CONFIG, refinedClientId);
+ }
+
+ private void maybeOverrideEnableIdempotence(final Map<String, Object>
configs) {
+ boolean userConfiguredIdempotence =
this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
+ boolean userConfiguredTransactions =
this.originals().containsKey(TRANSACTIONAL_ID_CONFIG);
+
+ if (userConfiguredTransactions && !userConfiguredIdempotence) {
+ configs.put(ENABLE_IDEMPOTENCE_CONFIG, true);
+ }
+ }
+
+ private void maybeOverrideAcksAndRetries(final Map<String, Object>
configs) {
+ final String acksStr = parseAcks(this.getString(ACKS_CONFIG));
+ configs.put(ACKS_CONFIG, acksStr);
+ // For idempotence producers, values for `RETRIES_CONFIG` and
`ACKS_CONFIG` might need to be overridden.
+ if (idempotenceEnabled()) {
+ boolean userConfiguredRetries =
this.originals().containsKey(RETRIES_CONFIG);
+ if (this.getInt(RETRIES_CONFIG) == 0) {
+ throw new ConfigException("Must set " +
ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent
producer.");
+ }
+ configs.put(RETRIES_CONFIG, userConfiguredRetries ?
this.getInt(RETRIES_CONFIG) : Integer.MAX_VALUE);
+
+ boolean userConfiguredAcks =
this.originals().containsKey(ACKS_CONFIG);
+ final short acks = Short.valueOf(acksStr);
+ if (userConfiguredAcks && acks != (short) -1) {
+ throw new ConfigException("Must set " + ACKS_CONFIG + " to all
in order to use the idempotent " +
+ "producer. Otherwise we cannot guarantee
idempotence.");
+ }
+ configs.put(ACKS_CONFIG, "-1");
+ }
+ }
+
+ private static String parseAcks(String acksString) {
+ try {
+ return acksString.trim().equalsIgnoreCase("all") ? "-1" :
Short.parseShort(acksString.trim()) + "";
+ } catch (NumberFormatException e) {
+ throw new ConfigException("Invalid configuration value for 'acks':
" + acksString);
+ }
}
public static Map<String, Object> addSerializerToConfig(Map<String,
Object> configs,
@@ -410,6 +468,16 @@ public class ProducerConfig extends AbstractConfig {
super(CONFIG, props);
}
+ boolean idempotenceEnabled() {
+ boolean userConfiguredIdempotence =
this.originals().containsKey(ENABLE_IDEMPOTENCE_CONFIG);
+ boolean userConfiguredTransactions =
this.originals().containsKey(TRANSACTIONAL_ID_CONFIG);
+ boolean idempotenceEnabled = userConfiguredIdempotence &&
this.getBoolean(ENABLE_IDEMPOTENCE_CONFIG);
+
+ if (!idempotenceEnabled && userConfiguredIdempotence &&
userConfiguredTransactions)
+ throw new ConfigException("Cannot set a " +
ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
+ return userConfiguredTransactions || idempotenceEnabled;
+ }
+
ProducerConfig(Map<?, ?> props, boolean doLog) {
super(CONFIG, props, doLog);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 3992a41..819770b 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -106,12 +106,12 @@ public class AbstractConfig {
this.originals = resolveConfigVariables(configProviderProps,
(Map<String, Object>) originals);
this.values = definition.parse(this.originals);
+ this.used = Collections.synchronizedSet(new HashSet<>());
Map<String, Object> configUpdates =
postProcessParsedConfig(Collections.unmodifiableMap(this.values));
for (Map.Entry<String, Object> update : configUpdates.entrySet()) {
this.values.put(update.getKey(), update.getValue());
}
definition.parse(this.values);
- this.used = Collections.synchronizedSet(new HashSet<>());
this.definition = definition;
if (doLog)
logAll();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index eee067f..92ae90b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -114,6 +114,22 @@ public class KafkaProducerTest {
Collections.emptySet());
@Test
+ public void testOverwriteAcksAndRetriesForIdempotentProducers() {
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
+ props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"transactionalId");
+ props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, new
StringSerializer().getClass().getName());
+ props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new
StringSerializer().getClass().getName());
+
+ ProducerConfig config = new ProducerConfig(props);
+
assertTrue(config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
+ assertTrue(Arrays.asList("-1", "all").stream().anyMatch(each ->
each.equalsIgnoreCase(config.getString(ProducerConfig.ACKS_CONFIG))));
+ assertTrue(config.getInt(ProducerConfig.RETRIES_CONFIG) ==
Integer.MAX_VALUE);
+
assertTrue(config.getString(ProducerConfig.CLIENT_ID_CONFIG).equalsIgnoreCase("producer-"
+
+ config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG)));
+ }
+
+ @Test
public void testMetricsReporterAutoGeneratedClientId() {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");