This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new e677a65 CAMEL-14968: Allow to configure shutdown timeout for kafka
consumer/producer waiting for its worker threads to shutdown graceful.
e677a65 is described below
commit e677a65f171b0a5f7083edc048c823923f75c071
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed May 6 14:00:28 2020 +0200
CAMEL-14968: Allow to configure shutdown timeout for kafka
consumer/producer waiting for its worker threads to shutdown graceful.
---
.../component/kafka/KafkaComponentConfigurer.java | 5 ++
.../component/kafka/KafkaEndpointConfigurer.java | 5 ++
.../org/apache/camel/component/kafka/kafka.json | 2 +
.../camel-kafka/src/main/docs/kafka-component.adoc | 6 +-
.../camel/component/kafka/KafkaConfiguration.java | 13 ++++
.../camel/component/kafka/KafkaConsumer.java | 10 ++-
.../camel/component/kafka/KafkaProducer.java | 4 +-
.../dsl/KafkaComponentBuilderFactory.java | 14 ++++
.../endpoint/dsl/KafkaEndpointBuilderFactory.java | 80 ++++++++++++++++++++++
9 files changed, 135 insertions(+), 4 deletions(-)
diff --git
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index 23e52e03..c36877a 100644
---
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -169,6 +169,8 @@ public class KafkaComponentConfigurer extends
PropertyConfigurerSupport implemen
case "serializerClass":
getOrCreateConfiguration(target).setSerializerClass(property(camelContext,
java.lang.String.class, value)); return true;
case "sessiontimeoutms":
case "sessionTimeoutMs":
getOrCreateConfiguration(target).setSessionTimeoutMs(property(camelContext,
java.lang.Integer.class, value)); return true;
+ case "shutdowntimeout":
+ case "shutdownTimeout":
getOrCreateConfiguration(target).setShutdownTimeout(property(camelContext,
int.class, value)); return true;
case "specificavroreader":
case "specificAvroReader":
getOrCreateConfiguration(target).setSpecificAvroReader(property(camelContext,
boolean.class, value)); return true;
case "sslciphersuites":
@@ -294,6 +296,7 @@ public class KafkaComponentConfigurer extends
PropertyConfigurerSupport implemen
answer.put("sendBufferBytes", java.lang.Integer.class);
answer.put("serializerClass", java.lang.String.class);
answer.put("sessionTimeoutMs", java.lang.Integer.class);
+ answer.put("shutdownTimeout", int.class);
answer.put("specificAvroReader", boolean.class);
answer.put("sslCipherSuites", java.lang.String.class);
answer.put("sslContextParameters",
org.apache.camel.support.jsse.SSLContextParameters.class);
@@ -466,6 +469,8 @@ public class KafkaComponentConfigurer extends
PropertyConfigurerSupport implemen
case "serializerClass": return
getOrCreateConfiguration(target).getSerializerClass();
case "sessiontimeoutms":
case "sessionTimeoutMs": return
getOrCreateConfiguration(target).getSessionTimeoutMs();
+ case "shutdowntimeout":
+ case "shutdownTimeout": return
getOrCreateConfiguration(target).getShutdownTimeout();
case "specificavroreader":
case "specificAvroReader": return
getOrCreateConfiguration(target).isSpecificAvroReader();
case "sslciphersuites":
diff --git
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index 7ed526a..ddae0f4 100644
---
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -163,6 +163,8 @@ public class KafkaEndpointConfigurer extends
PropertyConfigurerSupport implement
case "serializerClass":
target.getConfiguration().setSerializerClass(property(camelContext,
java.lang.String.class, value)); return true;
case "sessiontimeoutms":
case "sessionTimeoutMs":
target.getConfiguration().setSessionTimeoutMs(property(camelContext,
java.lang.Integer.class, value)); return true;
+ case "shutdowntimeout":
+ case "shutdownTimeout":
target.getConfiguration().setShutdownTimeout(property(camelContext, int.class,
value)); return true;
case "specificavroreader":
case "specificAvroReader":
target.getConfiguration().setSpecificAvroReader(property(camelContext,
boolean.class, value)); return true;
case "sslciphersuites":
@@ -287,6 +289,7 @@ public class KafkaEndpointConfigurer extends
PropertyConfigurerSupport implement
answer.put("sendBufferBytes", java.lang.Integer.class);
answer.put("serializerClass", java.lang.String.class);
answer.put("sessionTimeoutMs", java.lang.Integer.class);
+ answer.put("shutdownTimeout", int.class);
answer.put("specificAvroReader", boolean.class);
answer.put("sslCipherSuites", java.lang.String.class);
answer.put("sslContextParameters",
org.apache.camel.support.jsse.SSLContextParameters.class);
@@ -460,6 +463,8 @@ public class KafkaEndpointConfigurer extends
PropertyConfigurerSupport implement
case "serializerClass": return
target.getConfiguration().getSerializerClass();
case "sessiontimeoutms":
case "sessionTimeoutMs": return
target.getConfiguration().getSessionTimeoutMs();
+ case "shutdowntimeout":
+ case "shutdownTimeout": return
target.getConfiguration().getShutdownTimeout();
case "specificavroreader":
case "specificAvroReader": return
target.getConfiguration().isSpecificAvroReader();
case "sslciphersuites":
diff --git
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index 756303b..239eb77 100644
---
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -27,6 +27,7 @@
"configuration": { "kind": "property", "displayName": "Configuration",
"group": "common", "label": "", "required": false, "type": "object",
"javaType": "org.apache.camel.component.kafka.KafkaConfiguration",
"deprecated": false, "secret": false, "description": "Allows to pre-configure
the Kafka component with common options that the endpoints will reuse." },
"headerFilterStrategy": { "kind": "property", "displayName": "Header
Filter Strategy", "group": "common", "label": "common", "required": false,
"type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy",
"deprecated": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "To use a custom HeaderFilterStrategy to filter
header to and from Camel message." },
"reconnectBackoffMaxMs": { "kind": "property", "displayName": "Reconnect
Backoff Max Ms", "group": "common", "label": "common", "required": false,
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"secret": false, "defaultValue": "1000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The maximum amount of time in milliseconds to
wait when reconnecting to a broker that has repea [...]
+ "shutdownTimeout": { "kind": "property", "displayName": "Shutdown
Timeout", "group": "common", "label": "common", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "secret": false,
"defaultValue": "30000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Timeout in milli seconds to wait gracefully
for the consumer or producer to shutdown and terminate its worker threads." },
"allowManualCommit": { "kind": "property", "displayName": "Allow Manual
Commit", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "secret": false,
"defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Whether to allow doing manual commits via
KafkaManualCommit. If this option is enabled then an instance of Kafk [...]
"autoCommitEnable": { "kind": "property", "displayName": "Auto Commit
Enable", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "java.lang.Boolean", "deprecated": false, "secret":
false, "defaultValue": "true", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "If true, periodically commit to ZooKeeper the
offset of messages already fetched by the consumer. This [...]
"autoCommitIntervalMs": { "kind": "property", "displayName": "Auto Commit
Interval Ms", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"secret": false, "defaultValue": "5000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The frequency in ms that the consumer offsets
are committed to zookeeper." },
@@ -125,6 +126,7 @@
"clientId": { "kind": "parameter", "displayName": "Client Id", "group":
"common", "label": "common", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The client id is a user-specified string sent
in each request to help trace calls. It should logically identify the
application making the request." },
"headerFilterStrategy": { "kind": "parameter", "displayName": "Header
Filter Strategy", "group": "common", "label": "common", "required": false,
"type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy",
"deprecated": false, "secret": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "To use a custom HeaderFilterStrategy to filter
header to and from Camel message." },
"reconnectBackoffMaxMs": { "kind": "parameter", "displayName": "Reconnect
Backoff Max Ms", "group": "common", "label": "common", "required": false,
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"secret": false, "defaultValue": "1000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The maximum amount of time in milliseconds to
wait when reconnecting to a broker that has repe [...]
+ "shutdownTimeout": { "kind": "parameter", "displayName": "Shutdown
Timeout", "group": "common", "label": "common", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "secret": false,
"defaultValue": "30000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Timeout in milli seconds to wait gracefully
for the consumer or producer to shutdown and terminate its worker threads." },
"allowManualCommit": { "kind": "parameter", "displayName": "Allow Manual
Commit", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "secret": false,
"defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Whether to allow doing manual commits via
KafkaManualCommit. If this option is enabled then an instance of Kaf [...]
"autoCommitEnable": { "kind": "parameter", "displayName": "Auto Commit
Enable", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "java.lang.Boolean", "deprecated": false, "secret":
false, "defaultValue": "true", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "If true, periodically commit to ZooKeeper the
offset of messages already fetched by the consumer. Thi [...]
"autoCommitIntervalMs": { "kind": "parameter", "displayName": "Auto Commit
Interval Ms", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"secret": false, "defaultValue": "5000", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "The frequency in ms that the consumer offsets
are committed to zookeeper." },
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index bf5c294..e00506f5 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -40,7 +40,7 @@ kafka:topic[?options]
// component options: START
-The Kafka component supports 96 options, which are listed below.
+The Kafka component supports 97 options, which are listed below.
@@ -53,6 +53,7 @@ The Kafka component supports 96 options, which are listed
below.
| *configuration* (common) | Allows to pre-configure the Kafka component with
common options that the endpoints will reuse. | | KafkaConfiguration
| *headerFilterStrategy* (common) | To use a custom HeaderFilterStrategy to
filter header to and from Camel message. | | HeaderFilterStrategy
| *reconnectBackoffMaxMs* (common) | The maximum amount of time in
milliseconds to wait when reconnecting to a broker that has repeatedly failed
to connect. If provided, the backoff per host will increase exponentially for
each consecutive connection failure, up to this maximum. After calculating the
backoff increase, 20% random jitter is added to avoid connection storms. | 1000
| Integer
+| *shutdownTimeout* (common) | Timeout in milli seconds to wait gracefully for
the consumer or producer to shutdown and terminate its worker threads. | 30000
| int
| *allowManualCommit* (consumer) | Whether to allow doing manual commits via
KafkaManualCommit. If this option is enabled then an instance of
KafkaManualCommit is stored on the Exchange message header, which allows end
users to access this API and perform manual offset commits via the Kafka
consumer. | false | boolean
| *autoCommitEnable* (consumer) | If true, periodically commit to ZooKeeper
the offset of messages already fetched by the consumer. This committed offset
will be used when the process fails as the position from which the new consumer
will begin. | true | Boolean
| *autoCommitIntervalMs* (consumer) | The frequency in ms that the consumer
offsets are committed to zookeeper. | 5000 | Integer
@@ -167,7 +168,7 @@ with the following path and query parameters:
|===
-=== Query Parameters (96 parameters):
+=== Query Parameters (97 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -178,6 +179,7 @@ with the following path and query parameters:
| *clientId* (common) | The client id is a user-specified string sent in each
request to help trace calls. It should logically identify the application
making the request. | | String
| *headerFilterStrategy* (common) | To use a custom HeaderFilterStrategy to
filter header to and from Camel message. | | HeaderFilterStrategy
| *reconnectBackoffMaxMs* (common) | The maximum amount of time in
milliseconds to wait when reconnecting to a broker that has repeatedly failed
to connect. If provided, the backoff per host will increase exponentially for
each consecutive connection failure, up to this maximum. After calculating the
backoff increase, 20% random jitter is added to avoid connection storms. | 1000
| Integer
+| *shutdownTimeout* (common) | Timeout in milli seconds to wait gracefully for
the consumer or producer to shutdown and terminate its worker threads. | 30000
| int
| *allowManualCommit* (consumer) | Whether to allow doing manual commits via
KafkaManualCommit. If this option is enabled then an instance of
KafkaManualCommit is stored on the Exchange message header, which allows end
users to access this API and perform manual offset commits via the Kafka
consumer. | false | boolean
| *autoCommitEnable* (consumer) | If true, periodically commit to ZooKeeper
the offset of messages already fetched by the consumer. This committed offset
will be used when the process fails as the position from which the new consumer
will begin. | true | Boolean
| *autoCommitIntervalMs* (consumer) | The frequency in ms that the consumer
offsets are committed to zookeeper. | 5000 | Integer
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 5f99fb5..0fd1c64 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -319,6 +319,8 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
// Additional properties
@UriParam(label = "common", prefix = "additionalProperties.", multiValue =
true)
private Map<String, Object> additionalProperties = new HashMap<>();
+ @UriParam(label = "common", defaultValue = "30000")
+ private int shutdownTimeout = 30000;
public KafkaConfiguration() {
}
@@ -661,6 +663,17 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
this.allowManualCommit = allowManualCommit;
}
+ public int getShutdownTimeout() {
+ return shutdownTimeout;
+ }
+
+ /**
+ * Timeout in milli seconds to wait gracefully for the consumer or
producer to shutdown and terminate its worker threads.
+ */
+ public void setShutdownTimeout(int shutdownTimeout) {
+ this.shutdownTimeout = shutdownTimeout;
+ }
+
public StateRepository<String, String> getOffsetRepository() {
return offsetRepository;
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 622f0c7..5bad75b 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;
import java.util.stream.StreamSupport;
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
@@ -78,6 +79,11 @@ public class KafkaConsumer extends DefaultConsumer {
}
}
+ @Override
+ public KafkaEndpoint getEndpoint() {
+ return (KafkaEndpoint) super.getEndpoint();
+ }
+
Properties getProps() {
Properties props =
endpoint.getConfiguration().createConsumerProperties();
endpoint.updateClassProperties(props);
@@ -142,7 +148,9 @@ public class KafkaConsumer extends DefaultConsumer {
if (executor != null) {
if (getEndpoint() != null && getEndpoint().getCamelContext() !=
null) {
-
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor);
+ int timeout =
getEndpoint().getConfiguration().getShutdownTimeout();
+ LOG.debug("Shutting down Kafka consumer worker threads with
timeout {} millis", timeout);
+
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor,
timeout);
} else {
executor.shutdownNow();
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index dd15bba..12c5239 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -137,7 +137,9 @@ public class KafkaProducer extends DefaultAsyncProducer {
}
if (shutdownWorkerPool && workerPool != null) {
-
endpoint.getCamelContext().getExecutorServiceManager().shutdown(workerPool);
+ int timeout = endpoint.getConfiguration().getShutdownTimeout();
+ LOG.debug("Shutting down Kafka producer worker threads with
timeout {} millis", timeout);
+
endpoint.getCamelContext().getExecutorServiceManager().shutdownGraceful(workerPool,
timeout);
workerPool = null;
}
}
diff --git
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 38de2e6..b3943fd 100644
---
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -138,6 +138,19 @@ public interface KafkaComponentBuilderFactory {
return this;
}
/**
+ * Timeout in milli seconds to wait gracefully for the consumer or
+ * producer to shutdown and terminate its worker threads.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 30000
+ * Group: common
+ */
+ default KafkaComponentBuilder shutdownTimeout(int shutdownTimeout) {
+ doSetProperty("shutdownTimeout", shutdownTimeout);
+ return this;
+ }
+ /**
* Whether to allow doing manual commits via KafkaManualCommit. If this
* option is enabled then an instance of KafkaManualCommit is stored on
* the Exchange message header, which allows end users to access this
@@ -1555,6 +1568,7 @@ public interface KafkaComponentBuilderFactory {
case "configuration": ((KafkaComponent)
component).setConfiguration((org.apache.camel.component.kafka.KafkaConfiguration)
value); return true;
case "headerFilterStrategy":
getOrCreateConfiguration((KafkaComponent)
component).setHeaderFilterStrategy((org.apache.camel.spi.HeaderFilterStrategy)
value); return true;
case "reconnectBackoffMaxMs":
getOrCreateConfiguration((KafkaComponent)
component).setReconnectBackoffMaxMs((java.lang.Integer) value); return true;
+ case "shutdownTimeout": getOrCreateConfiguration((KafkaComponent)
component).setShutdownTimeout((int) value); return true;
case "allowManualCommit":
getOrCreateConfiguration((KafkaComponent)
component).setAllowManualCommit((boolean) value); return true;
case "autoCommitEnable": getOrCreateConfiguration((KafkaComponent)
component).setAutoCommitEnable((java.lang.Boolean) value); return true;
case "autoCommitIntervalMs":
getOrCreateConfiguration((KafkaComponent)
component).setAutoCommitIntervalMs((java.lang.Integer) value); return true;
diff --git
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index 0d0941d..5a83015 100644
---
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -173,6 +173,33 @@ public interface KafkaEndpointBuilderFactory {
return this;
}
/**
+ * Timeout in milli seconds to wait gracefully for the consumer or
+ * producer to shutdown and terminate its worker threads.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 30000
+ * Group: common
+ */
+ default KafkaEndpointConsumerBuilder shutdownTimeout(int
shutdownTimeout) {
+ doSetProperty("shutdownTimeout", shutdownTimeout);
+ return this;
+ }
+ /**
+ * Timeout in milli seconds to wait gracefully for the consumer or
+ * producer to shutdown and terminate its worker threads.
+ *
+ * The option will be converted to a <code>int</code> type.
+ *
+ * Default: 30000
+ * Group: common
+ */
+ default KafkaEndpointConsumerBuilder shutdownTimeout(
+ String shutdownTimeout) {
+ doSetProperty("shutdownTimeout", shutdownTimeout);
+ return this;
+ }
+ /**
* Whether to allow doing manual commits via KafkaManualCommit. If this
* option is enabled then an instance of KafkaManualCommit is stored on
* the Exchange message header, which allows end users to access this
@@ -1597,6 +1624,33 @@ public interface KafkaEndpointBuilderFactory {
return this;
}
/**
+ * Timeout in milli seconds to wait gracefully for the consumer or
+ * producer to shutdown and terminate its worker threads.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 30000
+ * Group: common
+ */
+ default KafkaEndpointProducerBuilder shutdownTimeout(int
shutdownTimeout) {
+ doSetProperty("shutdownTimeout", shutdownTimeout);
+ return this;
+ }
+ /**
+ * Timeout in milli seconds to wait gracefully for the consumer or
+ * producer to shutdown and terminate its worker threads.
+ *
+ * The option will be converted to a <code>int</code> type.
+ *
+ * Default: 30000
+ * Group: common
+ */
+ default KafkaEndpointProducerBuilder shutdownTimeout(
+ String shutdownTimeout) {
+ doSetProperty("shutdownTimeout", shutdownTimeout);
+ return this;
+ }
+ /**
* The total bytes of memory the producer can use to buffer records
* waiting to be sent to the server. If records are sent faster than
* they can be delivered to the server the producer will either block
or
@@ -3190,6 +3244,32 @@ public interface KafkaEndpointBuilderFactory {
return this;
}
/**
+ * Timeout in milli seconds to wait gracefully for the consumer or
+ * producer to shutdown and terminate its worker threads.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 30000
+ * Group: common
+ */
+ default KafkaEndpointBuilder shutdownTimeout(int shutdownTimeout) {
+ doSetProperty("shutdownTimeout", shutdownTimeout);
+ return this;
+ }
+ /**
+ * Timeout in milli seconds to wait gracefully for the consumer or
+ * producer to shutdown and terminate its worker threads.
+ *
+ * The option will be converted to a <code>int</code> type.
+ *
+ * Default: 30000
+ * Group: common
+ */
+ default KafkaEndpointBuilder shutdownTimeout(String shutdownTimeout) {
+ doSetProperty("shutdownTimeout", shutdownTimeout);
+ return this;
+ }
+ /**
* URL of the Confluent Platform schema registry servers to use. The
* format is host1:port1,host2:port2. This is known as
* schema.registry.url in the Confluent Platform documentation. This