This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new cccdd7f (chores) camel-kafka: cleanup isAutoCommitEnable handling
cccdd7f is described below
commit cccdd7f208ae963fbe7bb02c9932b56d5aa1b17b
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Tue Jan 18 16:14:08 2022 +0100
(chores) camel-kafka: cleanup isAutoCommitEnable handling
---
.../resources/org/apache/camel/catalog/components/kafka.json | 4 ++--
.../apache/camel/component/kafka/KafkaComponentConfigurer.java | 6 +++---
.../apache/camel/component/kafka/KafkaEndpointConfigurer.java | 6 +++---
.../resources/org/apache/camel/component/kafka/kafka.json | 4 ++--
.../org/apache/camel/component/kafka/KafkaConfiguration.java | 6 +++---
.../org/apache/camel/component/kafka/KafkaFetchRecords.java | 10 ++--------
.../component/kafka/consumer/support/KafkaRecordProcessor.java | 4 ++--
.../kafka/consumer/support/KafkaRecordProcessorFacade.java | 5 +----
.../builder/component/dsl/KafkaComponentBuilderFactory.java | 7 +++----
.../builder/endpoint/dsl/KafkaEndpointBuilderFactory.java | 8 ++++----
10 files changed, 25 insertions(+), 35 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
index 5cd0250..4a3781f 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json
@@ -30,7 +30,7 @@
"reconnectBackoffMaxMs": { "kind": "property", "displayName": "Reconnect
Backoff Max Ms", "group": "common", "label": "common", "required": false,
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "1000",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "The maximum amount of
time in milliseconds to wait when reconnecting to a b [...]
"shutdownTimeout": { "kind": "property", "displayName": "Shutdown
Timeout", "group": "common", "label": "common", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 30000, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Timeout in milliseconds to wait gracefully for
the consumer or producer to shutdown and terminate its wo [...]
"allowManualCommit": { "kind": "property", "displayName": "Allow Manual
Commit", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Whether to allow doing manual commits via
KafkaManualCommit. If this option is enabled then [...]
- "autoCommitEnable": { "kind": "property", "displayName": "Auto Commit
Enable", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "true", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "If true, periodically commit to ZooKeeper the
offset of messages already fetched b [...]
+ "autoCommitEnable": { "kind": "property", "displayName": "Auto Commit
Enable", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": true, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "If true, periodically commit to ZooKeeper the
offset of messages already fetched by the consum [...]
"autoCommitIntervalMs": { "kind": "property", "displayName": "Auto Commit
Interval Ms", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "5000",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "The frequency in ms that
the consumer offsets are committed to zookeeper." },
"autoCommitOnStop": { "kind": "property", "displayName": "Auto Commit On
Stop", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "enum": [ "sync", "async", "none" ],
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"sync", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Whether to perform an explicit auto commit whe
[...]
"autoOffsetReset": { "kind": "property", "displayName": "Auto Offset
Reset", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "enum": [ "latest", "earliest",
"none" ], "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "latest", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "What to do when there is no initial offset
[...]
@@ -136,7 +136,7 @@
"reconnectBackoffMaxMs": { "kind": "parameter", "displayName": "Reconnect
Backoff Max Ms", "group": "common", "label": "common", "required": false,
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "1000",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "The maximum amount of
time in milliseconds to wait when reconnecting to a [...]
"shutdownTimeout": { "kind": "parameter", "displayName": "Shutdown
Timeout", "group": "common", "label": "common", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 30000, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Timeout in milliseconds to wait gracefully for
the consumer or producer to shutdown and terminate its w [...]
"allowManualCommit": { "kind": "parameter", "displayName": "Allow Manual
Commit", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Whether to allow doing manual commits via
KafkaManualCommit. If this option is enabled the [...]
- "autoCommitEnable": { "kind": "parameter", "displayName": "Auto Commit
Enable", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "true", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "If true, periodically commit to ZooKeeper the
offset of messages already fetched [...]
+ "autoCommitEnable": { "kind": "parameter", "displayName": "Auto Commit
Enable", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": true, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "If true, periodically commit to ZooKeeper the
offset of messages already fetched by the consu [...]
"autoCommitIntervalMs": { "kind": "parameter", "displayName": "Auto Commit
Interval Ms", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "5000",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "The frequency in ms that
the consumer offsets are committed to zookeeper." },
"autoCommitOnStop": { "kind": "parameter", "displayName": "Auto Commit On
Stop", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "enum": [ "sync", "async", "none" ],
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"sync", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Whether to perform an explicit auto commit wh
[...]
"autoOffsetReset": { "kind": "parameter", "displayName": "Auto Offset
Reset", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "enum": [ "latest", "earliest",
"none" ], "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "latest", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "What to do when there is no initial offse [...]
diff --git
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index 1d9b659..d8f6a58 100644
---
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -33,7 +33,7 @@ public class KafkaComponentConfigurer extends
PropertyConfigurerSupport implemen
case "allowmanualcommit":
case "allowManualCommit":
getOrCreateConfiguration(target).setAllowManualCommit(property(camelContext,
boolean.class, value)); return true;
case "autocommitenable":
- case "autoCommitEnable":
getOrCreateConfiguration(target).setAutoCommitEnable(property(camelContext,
java.lang.Boolean.class, value)); return true;
+ case "autoCommitEnable":
getOrCreateConfiguration(target).setAutoCommitEnable(property(camelContext,
boolean.class, value)); return true;
case "autocommitintervalms":
case "autoCommitIntervalMs":
getOrCreateConfiguration(target).setAutoCommitIntervalMs(property(camelContext,
java.lang.Integer.class, value)); return true;
case "autocommitonstop":
@@ -247,7 +247,7 @@ public class KafkaComponentConfigurer extends
PropertyConfigurerSupport implemen
case "allowmanualcommit":
case "allowManualCommit": return boolean.class;
case "autocommitenable":
- case "autoCommitEnable": return java.lang.Boolean.class;
+ case "autoCommitEnable": return boolean.class;
case "autocommitintervalms":
case "autoCommitIntervalMs": return java.lang.Integer.class;
case "autocommitonstop":
@@ -457,7 +457,7 @@ public class KafkaComponentConfigurer extends
PropertyConfigurerSupport implemen
case "allowmanualcommit":
case "allowManualCommit": return
getOrCreateConfiguration(target).isAllowManualCommit();
case "autocommitenable":
- case "autoCommitEnable": return
getOrCreateConfiguration(target).getAutoCommitEnable();
+ case "autoCommitEnable": return
getOrCreateConfiguration(target).isAutoCommitEnable();
case "autocommitintervalms":
case "autoCommitIntervalMs": return
getOrCreateConfiguration(target).getAutoCommitIntervalMs();
case "autocommitonstop":
diff --git
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
index f7e64e2..7c3ef61 100644
---
a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
+++
b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java
@@ -26,7 +26,7 @@ public class KafkaEndpointConfigurer extends
PropertyConfigurerSupport implement
case "allowmanualcommit":
case "allowManualCommit":
target.getConfiguration().setAllowManualCommit(property(camelContext,
boolean.class, value)); return true;
case "autocommitenable":
- case "autoCommitEnable":
target.getConfiguration().setAutoCommitEnable(property(camelContext,
java.lang.Boolean.class, value)); return true;
+ case "autoCommitEnable":
target.getConfiguration().setAutoCommitEnable(property(camelContext,
boolean.class, value)); return true;
case "autocommitintervalms":
case "autoCommitIntervalMs":
target.getConfiguration().setAutoCommitIntervalMs(property(camelContext,
java.lang.Integer.class, value)); return true;
case "autocommitonstop":
@@ -232,7 +232,7 @@ public class KafkaEndpointConfigurer extends
PropertyConfigurerSupport implement
case "allowmanualcommit":
case "allowManualCommit": return boolean.class;
case "autocommitenable":
- case "autoCommitEnable": return java.lang.Boolean.class;
+ case "autoCommitEnable": return boolean.class;
case "autocommitintervalms":
case "autoCommitIntervalMs": return java.lang.Integer.class;
case "autocommitonstop":
@@ -439,7 +439,7 @@ public class KafkaEndpointConfigurer extends
PropertyConfigurerSupport implement
case "allowmanualcommit":
case "allowManualCommit": return
target.getConfiguration().isAllowManualCommit();
case "autocommitenable":
- case "autoCommitEnable": return
target.getConfiguration().getAutoCommitEnable();
+ case "autoCommitEnable": return
target.getConfiguration().isAutoCommitEnable();
case "autocommitintervalms":
case "autoCommitIntervalMs": return
target.getConfiguration().getAutoCommitIntervalMs();
case "autocommitonstop":
diff --git
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index 5cd0250..4a3781f 100644
---
a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++
b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -30,7 +30,7 @@
"reconnectBackoffMaxMs": { "kind": "property", "displayName": "Reconnect
Backoff Max Ms", "group": "common", "label": "common", "required": false,
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "1000",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "The maximum amount of
time in milliseconds to wait when reconnecting to a b [...]
"shutdownTimeout": { "kind": "property", "displayName": "Shutdown
Timeout", "group": "common", "label": "common", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 30000, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Timeout in milliseconds to wait gracefully for
the consumer or producer to shutdown and terminate its wo [...]
"allowManualCommit": { "kind": "property", "displayName": "Allow Manual
Commit", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Whether to allow doing manual commits via
KafkaManualCommit. If this option is enabled then [...]
- "autoCommitEnable": { "kind": "property", "displayName": "Auto Commit
Enable", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "true", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "If true, periodically commit to ZooKeeper the
offset of messages already fetched b [...]
+ "autoCommitEnable": { "kind": "property", "displayName": "Auto Commit
Enable", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": true, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "If true, periodically commit to ZooKeeper the
offset of messages already fetched by the consum [...]
"autoCommitIntervalMs": { "kind": "property", "displayName": "Auto Commit
Interval Ms", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "5000",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "The frequency in ms that
the consumer offsets are committed to zookeeper." },
"autoCommitOnStop": { "kind": "property", "displayName": "Auto Commit On
Stop", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "enum": [ "sync", "async", "none" ],
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"sync", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Whether to perform an explicit auto commit whe
[...]
"autoOffsetReset": { "kind": "property", "displayName": "Auto Offset
Reset", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "enum": [ "latest", "earliest",
"none" ], "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "latest", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "What to do when there is no initial offset
[...]
@@ -136,7 +136,7 @@
"reconnectBackoffMaxMs": { "kind": "parameter", "displayName": "Reconnect
Backoff Max Ms", "group": "common", "label": "common", "required": false,
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "1000",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "The maximum amount of
time in milliseconds to wait when reconnecting to a [...]
"shutdownTimeout": { "kind": "parameter", "displayName": "Shutdown
Timeout", "group": "common", "label": "common", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 30000, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Timeout in milliseconds to wait gracefully for
the consumer or producer to shutdown and terminate its w [...]
"allowManualCommit": { "kind": "parameter", "displayName": "Allow Manual
Commit", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Whether to allow doing manual commits via
KafkaManualCommit. If this option is enabled the [...]
- "autoCommitEnable": { "kind": "parameter", "displayName": "Auto Commit
Enable", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "true", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "If true, periodically commit to ZooKeeper the
offset of messages already fetched [...]
+ "autoCommitEnable": { "kind": "parameter", "displayName": "Auto Commit
Enable", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": true, "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "If true, periodically commit to ZooKeeper the
offset of messages already fetched by the consu [...]
"autoCommitIntervalMs": { "kind": "parameter", "displayName": "Auto Commit
Interval Ms", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "java.lang.Integer", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "5000",
"configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration",
"configurationField": "configuration", "description": "The frequency in ms that
the consumer offsets are committed to zookeeper." },
"autoCommitOnStop": { "kind": "parameter", "displayName": "Auto Commit On
Stop", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "enum": [ "sync", "async", "none" ],
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"sync", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "Whether to perform an explicit auto commit wh
[...]
"autoOffsetReset": { "kind": "parameter", "displayName": "Auto Offset
Reset", "group": "consumer", "label": "consumer", "required": false, "type":
"string", "javaType": "java.lang.String", "enum": [ "latest", "earliest",
"none" ], "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "latest", "configurationClass":
"org.apache.camel.component.kafka.KafkaConfiguration", "configurationField":
"configuration", "description": "What to do when there is no initial offse [...]
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 16f35e0..1e5d833 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -134,7 +134,7 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
// Consumer configuration properties
@UriParam(label = "consumer", defaultValue = "true")
- private Boolean autoCommitEnable = true;
+ private boolean autoCommitEnable = true;
@UriParam(label = "consumer")
private boolean allowManualCommit;
@UriParam(label = "consumer", defaultValue = "sync", enums =
"sync,async,none")
@@ -659,7 +659,7 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
return offsetRepository == null ? autoCommitEnable : false;
}
- public Boolean getAutoCommitEnable() {
+ public boolean getAutoCommitEnable() {
return autoCommitEnable;
}
@@ -667,7 +667,7 @@ public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware
* If true, periodically commit to ZooKeeper the offset of messages
already fetched by the consumer. This committed
* offset will be used when the process fails as the position from which
the new consumer will begin.
*/
- public void setAutoCommitEnable(Boolean autoCommitEnable) {
+ public void setAutoCommitEnable(boolean autoCommitEnable) {
this.autoCommitEnable = autoCommitEnable;
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 0d05e78..4fd454f 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -165,8 +165,7 @@ class KafkaFetchRecords implements Runnable {
}
KafkaRecordProcessorFacade recordProcessorFacade = new
KafkaRecordProcessorFacade(
- kafkaConsumer,
- lastProcessedOffset, threadId, isAutoCommitEnabled(),
consumer, asyncCommits);
+ kafkaConsumer, lastProcessedOffset, threadId, consumer,
asyncCommits);
Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
while (isKafkaConsumerRunnable() && isRetrying() && isConnected())
{
@@ -278,7 +277,7 @@ class KafkaFetchRecords implements Runnable {
private void commit() {
processAsyncCommits();
- if (isAutoCommitEnabled()) {
+ if
(kafkaConsumer.getEndpoint().getConfiguration().isAutoCommitEnable()) {
if
("async".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
{
LOG.info("Auto commitAsync on stop {} from {}", threadId,
getPrintableTopic());
consumer.commitAsync();
@@ -419,11 +418,6 @@ class KafkaFetchRecords implements Runnable {
safeStop();
}
- private boolean isAutoCommitEnabled() {
- return
kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitEnable() != null
- &&
kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitEnable();
- }
-
public boolean isConnected() {
return connected;
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index 70eb6f8..698b464 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -54,11 +54,11 @@ public class KafkaRecordProcessor {
private final String threadId;
private final ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits;
- public KafkaRecordProcessor(boolean autoCommitEnabled, KafkaConfiguration
configuration,
+ public KafkaRecordProcessor(KafkaConfiguration configuration,
Processor processor, Consumer<?, ?> consumer,
KafkaManualCommitFactory manualCommitFactory,
String threadId,
ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits) {
- this.autoCommitEnabled = autoCommitEnabled;
+ this.autoCommitEnabled = configuration.isAutoCommitEnable();
this.configuration = configuration;
this.processor = processor;
this.consumer = consumer;
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index 2fc8043..585aee6 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -43,14 +43,13 @@ public class KafkaRecordProcessorFacade {
private final KafkaRecordProcessor kafkaRecordProcessor;
public KafkaRecordProcessorFacade(KafkaConsumer camelKafkaConsumer,
Map<String, Long> lastProcessedOffset, String threadId,
- boolean autoCommit,
org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits) {
this.camelKafkaConsumer = camelKafkaConsumer;
this.lastProcessedOffset = lastProcessedOffset;
this.threadId = threadId;
- kafkaRecordProcessor = buildKafkaRecordProcessor(autoCommit, consumer,
asyncCommits);
+ kafkaRecordProcessor = buildKafkaRecordProcessor(consumer,
asyncCommits);
}
private boolean isStopping() {
@@ -138,11 +137,9 @@ public class KafkaRecordProcessorFacade {
}
private KafkaRecordProcessor buildKafkaRecordProcessor(
- boolean autoCommit,
org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
ConcurrentLinkedQueue<KafkaAsyncManualCommit> asyncCommits) {
return new KafkaRecordProcessor(
- autoCommit,
camelKafkaConsumer.getEndpoint().getConfiguration(),
camelKafkaConsumer.getProcessor(),
consumer,
diff --git
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index a6f2815..2ce40d2 100644
---
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -198,7 +198,7 @@ public interface KafkaComponentBuilderFactory {
* when the process fails as the position from which the new consumer
* will begin.
*
- * The option is a: <code>java.lang.Boolean</code> type.
+ * The option is a: <code>boolean</code> type.
*
* Default: true
* Group: consumer
@@ -206,8 +206,7 @@ public interface KafkaComponentBuilderFactory {
* @param autoCommitEnable the value to set
* @return the dsl builder
*/
- default KafkaComponentBuilder autoCommitEnable(
- java.lang.Boolean autoCommitEnable) {
+ default KafkaComponentBuilder autoCommitEnable(boolean
autoCommitEnable) {
doSetProperty("autoCommitEnable", autoCommitEnable);
return this;
}
@@ -2016,7 +2015,7 @@ public interface KafkaComponentBuilderFactory {
case "reconnectBackoffMaxMs":
getOrCreateConfiguration((KafkaComponent)
component).setReconnectBackoffMaxMs((java.lang.Integer) value); return true;
case "shutdownTimeout": getOrCreateConfiguration((KafkaComponent)
component).setShutdownTimeout((int) value); return true;
case "allowManualCommit":
getOrCreateConfiguration((KafkaComponent)
component).setAllowManualCommit((boolean) value); return true;
- case "autoCommitEnable": getOrCreateConfiguration((KafkaComponent)
component).setAutoCommitEnable((java.lang.Boolean) value); return true;
+ case "autoCommitEnable": getOrCreateConfiguration((KafkaComponent)
component).setAutoCommitEnable((boolean) value); return true;
case "autoCommitIntervalMs":
getOrCreateConfiguration((KafkaComponent)
component).setAutoCommitIntervalMs((java.lang.Integer) value); return true;
case "autoCommitOnStop": getOrCreateConfiguration((KafkaComponent)
component).setAutoCommitOnStop((java.lang.String) value); return true;
case "autoOffsetReset": getOrCreateConfiguration((KafkaComponent)
component).setAutoOffsetReset((java.lang.String) value); return true;
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
index d7854f7..b1ffc6a 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java
@@ -281,7 +281,7 @@ public interface KafkaEndpointBuilderFactory {
* when the process fails as the position from which the new consumer
* will begin.
*
- * The option is a: <code>java.lang.Boolean</code> type.
+ * The option is a: <code>boolean</code> type.
*
* Default: true
* Group: consumer
@@ -290,7 +290,7 @@ public interface KafkaEndpointBuilderFactory {
* @return the dsl builder
*/
default KafkaEndpointConsumerBuilder autoCommitEnable(
- Boolean autoCommitEnable) {
+ boolean autoCommitEnable) {
doSetProperty("autoCommitEnable", autoCommitEnable);
return this;
}
@@ -300,8 +300,8 @@ public interface KafkaEndpointBuilderFactory {
* when the process fails as the position from which the new consumer
* will begin.
*
- * The option will be converted to a
- * <code>java.lang.Boolean</code> type.
+ * The option will be converted to a <code>boolean</code>
+ * type.
*
* Default: true
* Group: consumer