This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new b543c74  KAFKA-6979; Add `default.api.timeout.ms` to KafkaConsumer 
(KIP-266) (#5122)
b543c74 is described below

commit b543c74e6a0a86b61a25a2c39ac853084552053c
Author: Dhruvil Shah <dhru...@confluent.io>
AuthorDate: Tue Jun 12 16:29:50 2018 -0700

    KAFKA-6979; Add `default.api.timeout.ms` to KafkaConsumer (KIP-266) (#5122)
    
    Adds a configuration that specifies the default timeout for KafkaConsumer 
APIs that could block. This was introduced in KIP-266.
    
    Reviewers: Satish Duggana <satish.dugg...@gmail.com>, Jason Gustafson 
<ja...@confluent.io>
---
 .../kafka/clients/consumer/ConsumerConfig.java     | 10 +++++
 .../kafka/clients/consumer/KafkaConsumer.java      | 52 +++++++++++++++-------
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  2 +
 docs/upgrade.html                                  | 11 +++--
 4 files changed, 54 insertions(+), 21 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 72e496c..bc9a716 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -218,6 +218,10 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String REQUEST_TIMEOUT_MS_CONFIG = 
CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
     private static final String REQUEST_TIMEOUT_MS_DOC = 
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
 
+    /** <code>default.api.timeout.ms</code> */
+    public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = 
"default.api.timeout.ms";
+    public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the 
timeout (in milliseconds) for consumer APIs that could block. This 
configuration is used as the default timeout for all consumer operations that 
do not explicitly accept a <code>timeout</code> parameter.";
+
     /** <code>interceptor.classes</code> */
     public static final String INTERCEPTOR_CLASSES_CONFIG = 
"interceptor.classes";
     public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to 
use as interceptors. "
@@ -403,6 +407,12 @@ public class ConsumerConfig extends AbstractConfig {
                                         atLeast(0),
                                         Importance.MEDIUM,
                                         REQUEST_TIMEOUT_MS_DOC)
+                                .define(DEFAULT_API_TIMEOUT_MS_CONFIG,
+                                        Type.INT,
+                                        60 * 1000,
+                                        atLeast(0),
+                                        Importance.MEDIUM,
+                                        DEFAULT_API_TIMEOUT_MS_DOC)
                                 /* default is set to be a bit lower than the 
server default (10 min), to avoid both client and server closing connection at 
same time */
                                 .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
                                         Type.LONG,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 5bd6b93..d6973c0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -567,6 +567,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private final Metadata metadata;
     private final long retryBackoffMs;
     private final long requestTimeoutMs;
+    private final int defaultApiTimeoutMs;
     private volatile boolean closed = false;
     private List<PartitionAssignor> assignors;
 
@@ -666,6 +667,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
             log.debug("Initializing the Kafka consumer");
             this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+            this.defaultApiTimeoutMs = 
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
             int sessionTimeOutMs = 
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
             int fetchMaxWaitMs = 
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
             if (this.requestTimeoutMs <= sessionTimeOutMs || 
this.requestTimeoutMs <= fetchMaxWaitMs)
@@ -814,6 +816,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                   Metadata metadata,
                   long retryBackoffMs,
                   long requestTimeoutMs,
+                  int defaultApiTimeoutMs,
                   List<PartitionAssignor> assignors) {
         this.log = logContext.logger(getClass());
         this.clientId = clientId;
@@ -829,6 +832,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         this.metadata = metadata;
         this.retryBackoffMs = retryBackoffMs;
         this.requestTimeoutMs = requestTimeoutMs;
+        this.defaultApiTimeoutMs = defaultApiTimeoutMs;
         this.assignors = assignors;
     }
 
@@ -1268,8 +1272,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * every rebalance and also on startup. As such, if you need to store 
offsets in anything other than Kafka, this API
      * should not be used.
      * <p>
-     * This is a synchronous commits and will block until either the commit 
succeeds or an unrecoverable error is
-     * encountered (in which case it is thrown to the caller).
+     * This is a synchronous commit and will block until either the commit 
succeeds, an unrecoverable error is
+     * encountered (in which case it is thrown to the caller), or the timeout 
specified by {@code default.api.timeout.ms} expires
+     * (in which case a {@link 
org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
      * <p>
      * Note that asynchronous offset commits sent previously with the {@link 
#commitAsync(OffsetCommitCallback)}
      * (or similar) are guaranteed to have their callbacks invoked prior to 
completion of this method.
@@ -1286,10 +1291,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      *             configured groupId. See the exception for more details
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors (e.g. if offset metadata
      *             is too large or if the topic does not exist).
+     * @throws org.apache.kafka.common.errors.TimeoutException if the timeout 
specified by {@code default.api.timeout.ms} expires
+     *            before successful completion of the offset commit
      */
     @Override
     public void commitSync() {
-        commitSync(Duration.ofMillis(Long.MAX_VALUE));
+        commitSync(Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1343,7 +1350,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * i.e. lastProcessedMessageOffset + 1.
      * <p>
      * This is a synchronous commits and will block until either the commit 
succeeds or an unrecoverable error is
-     * encountered (in which case it is thrown to the caller).
+     * encountered (in which case it is thrown to the caller), or the timeout 
specified by {@code default.api.timeout.ms} expires
+     * (in which case a {@link 
org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
      * <p>
      * Note that asynchronous offset commits sent previously with the {@link 
#commitAsync(OffsetCommitCallback)}
      * (or similar) are guaranteed to have their callbacks invoked prior to 
completion of this method.
@@ -1362,10 +1370,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * @throws java.lang.IllegalArgumentException if the committed offset is 
negative
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors (e.g. if offset metadata
      *             is too large or if the topic does not exist).
+     * @throws org.apache.kafka.common.errors.TimeoutException if the timeout 
expires before successful completion
+     *            of the offset commit
      */
     @Override
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> 
offsets) {
-        commitSync(offsets, Duration.ofMillis(Long.MAX_VALUE));
+        commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1560,7 +1570,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * This method may issue a remote call to the server if there is no 
current position for the given partition.
      * <p>
      * This call will block until either the position could be determined or 
an unrecoverable error is
-     * encountered (in which case it is thrown to the caller).
+     * encountered (in which case it is thrown to the caller), or the timeout 
specified by {@code default.api.timeout.ms} expires
+     * (in which case a {@link 
org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
      *
      * @param partition The partition to get the position for
      * @return The current position of the consumer (that is, the offset of 
the next record to be fetched)
@@ -1575,10 +1586,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
      *             configured groupId. See the exception for more details
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
+     * @throws org.apache.kafka.common.errors.TimeoutException if the position 
cannot be determined before the
+     *             timeout specified by {@code default.api.timeout.ms} expires
      */
     @Override
     public long position(TopicPartition partition) {
-        return position(partition, Duration.ofMillis(Long.MAX_VALUE));
+        return position(partition, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1641,7 +1654,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * Get the last committed offset for the given partition (whether the 
commit happened by this process or
      * another). This offset will be used as the position for the consumer in 
the event of a failure.
      * <p>
-     * This call will block to do a remote call to get the latest committed 
offsets from the server.
+     * This call will do a remote call to get the latest committed offset from 
the server, and will block until the
+     * committed offset is gotten successfully, an unrecoverable error is 
encountered (in which case it is thrown to
+     * the caller), or the timeout specified by {@code default.api.timeout.ms} 
expires (in which case a
+     * {@link org.apache.kafka.common.errors.TimeoutException} is thrown to 
the caller).
      *
      * @param partition The partition to check
      * @return The last committed offset and metadata or null if there was no 
prior commit
@@ -1653,10 +1669,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
      *             configured groupId. See the exception for more details
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
+     * @throws org.apache.kafka.common.errors.TimeoutException if the 
committed offset cannot be found before
+     *             the timeout specified by {@code default.api.timeout.ms} 
expires.
      */
     @Override
     public OffsetAndMetadata committed(TopicPartition partition) {
-        return committed(partition, Duration.ofMillis(Long.MAX_VALUE));
+        return committed(partition, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1718,11 +1736,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the specified topic. See the exception for more details
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
-     *         the amount of time allocated by {@code request.timeout.ms} 
expires.
+     *         the amount of time allocated by {@code default.api.timeout.ms} 
expires.
      */
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
-        return partitionsFor(topic, Duration.ofMillis(requestTimeoutMs));
+        return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1774,11 +1792,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      *             this function is called
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
-     *         the amount of time allocated by {@code request.timeout.ms} 
expires.
+     *         the amount of time allocated by {@code default.api.timeout.ms} 
expires.
      */
     @Override
     public Map<String, List<PartitionInfo>> listTopics() {
-        return listTopics(Duration.ofMillis(requestTimeoutMs));
+        return listTopics(Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1879,13 +1897,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic(s). See the exception for more details
      * @throws IllegalArgumentException if the target timestamp is negative
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
-     *         the amount of time allocated by {@code request.timeout.ms} 
expires.
+     *         the amount of time allocated by {@code default.api.timeout.ms} 
expires.
      * @throws org.apache.kafka.common.errors.UnsupportedVersionException if 
the broker does not support looking up
      *         the offsets by timestamp
      */
     @Override
     public Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
-        return offsetsForTimes(timestampsToSearch, 
Duration.ofMillis(requestTimeoutMs));
+        return offsetsForTimes(timestampsToSearch, 
Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1939,11 +1957,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic(s). See the exception for more details
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
-     *         expiration of the configured {@code request.timeout.ms}
+     *         expiration of the configured {@code default.api.timeout.ms}
      */
     @Override
     public Map<TopicPartition, Long> 
beginningOffsets(Collection<TopicPartition> partitions) {
-        return beginningOffsets(partitions, 
Duration.ofMillis(requestTimeoutMs));
+        return beginningOffsets(partitions, 
Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 4be6884..b8681e8 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1748,6 +1748,7 @@ public class KafkaConsumerTest {
         String metricGroupPrefix = "consumer";
         long retryBackoffMs = 100;
         long requestTimeoutMs = 30000;
+        int defaultApiTimeoutMs = 30000;
         boolean excludeInternalTopics = true;
         int minBytes = 1;
         int maxBytes = Integer.MAX_VALUE;
@@ -1825,6 +1826,7 @@ public class KafkaConsumerTest {
                 metadata,
                 retryBackoffMs,
                 requestTimeoutMs,
+                defaultApiTimeoutMs,
                 assignors);
     }
 
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 3c75bad..7061d6c 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -98,10 +98,13 @@
         
<code>internal.value.converter=org.apache.kafka.connect.json.JsonConverter</code>
         <code>internal.value.converter.schemas.enable=false</code>
     </li>
-    <li><a href="https://cwiki.apache.org/confluence/x/5kiHB";>KIP-266</a> adds 
overloads to the consumer to support
-        timeout behavior for blocking APIs. In particular, a new 
<code>poll(Duration)</code> API has been added which
-        does not block for dynamic partition assignment. The old 
<code>poll(long)</code> API has been deprecated and
-        will be removed in a future version.</li>
+    <li><a href="https://cwiki.apache.org/confluence/x/5kiHB";>KIP-266</a> adds 
a new consumer configuration <code>default.api.timeout.ms</code>
+        to specify the default timeout to use for <code>KafkaConsumer</code> 
APIs that could block. The KIP also adds overloads for such blocking
+        APIs to support specifying a specific timeout to use for each of them 
instead of using the default timeout set by <code>default.api.timeout.ms</code>.
+        In particular, a new <code>poll(Duration)</code> API has been added 
which does not block for dynamic partition assignment.
+        The old <code>poll(long)</code> API has been deprecated and will be 
removed in a future version. Overloads have also been added
+        for other <code>KafkaConsumer</code> methods like 
<code>partitionsFor</code>, <code>listTopics</code>, 
<code>offsetsForTimes</code>,
+        <code>beginningOffsets</code>, <code>endOffsets</code> and 
<code>close</code> that take in a <code>Duration</code>.</li>
     <li>The internal method 
<code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. 
Users are encouraged to migrate to 
<code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li>
     <li>The tool kafka.tools.ReplayLogProducer has been removed.</li>
     <li>The AclCommand tool <code>--producer</code> convenience option uses 
the <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API";>KIP-277</a>
 finer grained ACL on the given topic. </li>

-- 
To stop receiving notification emails like this one, please contact
j...@apache.org.

Reply via email to