This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.20.x by this push:
     new 979c73c  CAMEL-12019: camel-kafka - Add option max.poll.interval.ms
979c73c is described below

commit 979c73c326616c9447d0df2eec54a606b2785527
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Tue Nov 21 14:43:24 2017 +0100

    CAMEL-12019: camel-kafka - Add option max.poll.interval.ms
---
 .../camel-kafka/src/main/docs/kafka-component.adoc      |  3 ++-
 .../camel/component/kafka/KafkaConfiguration.java       | 17 +++++++++++++++++
 .../kafka/springboot/KafkaComponentConfiguration.java   | 17 +++++++++++++++++
 3 files changed, 36 insertions(+), 1 deletion(-)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index d557958..216911b 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -69,7 +69,7 @@ with the following path and query parameters:
 | *topic* | *Required* Name of the topic to use. On the consumer you can use 
comma to separate multiple topics. A producer can only send a message to a 
single topic. |  | String
 |===
 
-==== Query Parameters (86 parameters):
+==== Query Parameters (87 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
@@ -93,6 +93,7 @@ with the following path and query parameters:
 | *heartbeatIntervalMs* (consumer) | The expected time between heartbeats to 
the consumer coordinator when using Kafka's group management facilities. 
Heartbeats are used to ensure that the consumer's session stays active and to 
facilitate rebalancing when new consumers join or leave the group. The value 
must be set lower than session.timeout.ms but typically should be set no higher 
than 1/3 of that value. It can be adjusted even lower to control the expected 
time for normal rebalances. | [...]
 | *keyDeserializer* (consumer) | Deserializer class for key that implements 
the Deserializer interface. | 
org.apache.kafka.common.serialization.StringDeserializer | String
 | *maxPartitionFetchBytes* (consumer) | The maximum amount of data 
per-partition the server will return. The maximum total memory used for a 
request will be partitions max.partition.fetch.bytes. This size must be at 
least as large as the maximum message size the server allows or else it is 
possible for the producer to send messages larger than the consumer can fetch. 
If that happens the consumer can get stuck trying to fetch a large message on a 
certain partition. | 1048576 | Integer
+| *maxPollIntervalMs* (consumer) | The maximum delay between invocations of 
poll() when using consumer group management. This places an upper bound on the 
amount of time that the consumer can be idle before fetching more records. If 
poll() is not called before expiration of this timeout then the consumer is 
considered failed and the group will rebalance in order to reassign the 
partitions to another member. |  | Long
 | *maxPollRecords* (consumer) | The maximum number of records returned in a 
single call to poll() | 500 | Integer
 | *offsetRepository* (consumer) | The offset repository to use in order to 
locally store the offset of each partition of the topic. Defining one will 
disable the autocommit. |  | String>
 | *partitionAssignor* (consumer) | The class name of the partition assignment 
strategy that the client will use to distribute partition ownership amongst 
consumer instances when group management is used | 
org.apache.kafka.clients.consumer.RangeAssignor | String
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index e724ef6..f82efb7 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -89,6 +89,8 @@ public class KafkaConfiguration implements Cloneable {
     private Integer maxPollRecords;
     @UriParam(label = "consumer", defaultValue = "5000")
     private Long pollTimeoutMs = 5000L;
+    @UriParam(label = "consumer")
+    private Long maxPollIntervalMs;
     //auto.offset.reset1
     @UriParam(label = "consumer", defaultValue = "latest", enums = 
"latest,earliest,none")
     private String autoOffsetReset = "latest";
@@ -372,6 +374,7 @@ public class KafkaConfiguration implements Cloneable {
         addPropertyIfNotNull(props, 
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, getHeartbeatIntervalMs());
         addPropertyIfNotNull(props, 
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, getMaxPartitionFetchBytes());
         addPropertyIfNotNull(props, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
getSessionTimeoutMs());
+        addPropertyIfNotNull(props, 
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, getMaxPollIntervalMs());
         addPropertyIfNotNull(props, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
getMaxPollRecords());
         addPropertyIfNotNull(props, ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
getInterceptorClasses());
         addPropertyIfNotNull(props, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
getAutoOffsetReset());
@@ -1377,6 +1380,20 @@ public class KafkaConfiguration implements Cloneable {
         this.pollTimeoutMs = pollTimeoutMs;
     }
 
+    public Long getMaxPollIntervalMs() {
+        return maxPollIntervalMs;
+    }
+
+    /**
+     * The maximum delay between invocations of poll() when using consumer 
group management.
+     * This places an upper bound on the amount of time that the consumer can 
be idle before fetching more records.
+     * If poll() is not called before expiration of this timeout, then the 
consumer is considered failed and the group
+     * will rebalance in order to reassign the partitions to another member.
+     */
+    public void setMaxPollIntervalMs(Long maxPollIntervalMs) {
+        this.maxPollIntervalMs = maxPollIntervalMs;
+    }
+
     public String getPartitionAssignor() {
         return partitionAssignor;
     }
diff --git 
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
 
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index 03c1f48..b29b6c0 100644
--- 
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ 
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -605,6 +605,15 @@ public class KafkaComponentConfiguration
          */
         private Long pollTimeoutMs = 5000L;
         /**
+         * The maximum delay between invocations of poll() when using consumer
+         * group management. This places an upper bound on the amount of time
+         * that the consumer can be idle before fetching more records. If 
poll()
+         * is not called before expiration of this timeout, then the consumer 
is
+         * considered failed and the group will rebalance in order to reassign
+         * the partitions to another member.
+         */
+        private Long maxPollIntervalMs;
+        /**
          * The class name of the partition assignment strategy that the client
          * will use to distribute partition ownership amongst consumer 
instances
          * when group management is used
@@ -1261,6 +1270,14 @@ public class KafkaComponentConfiguration
             this.pollTimeoutMs = pollTimeoutMs;
         }
 
+        public Long getMaxPollIntervalMs() {
+            return maxPollIntervalMs;
+        }
+
+        public void setMaxPollIntervalMs(Long maxPollIntervalMs) {
+            this.maxPollIntervalMs = maxPollIntervalMs;
+        }
+
         public String getPartitionAssignor() {
             return partitionAssignor;
         }

-- 
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <commits@camel.apache.org>'].

Reply via email to