This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 67dd417915b2 CAMEL-22863 - Camel-CassandraQL: Add requestTimeout
parameter to camel-cassandraql component (#20858)
67dd417915b2 is described below
commit 67dd417915b2808f5a03ea730d0da875e2d80541
Author: Andrea Cosentino <[email protected]>
AuthorDate: Fri Jan 16 14:12:37 2026 +0100
CAMEL-22863 - Camel-CassandraQL: Add requestTimeout parameter to
camel-cassandraql component (#20858)
Signed-off-by: Andrea Cosentino <[email protected]>
---
.../org/apache/camel/catalog/components/cql.json | 37 ++--
.../cassandra/CassandraEndpointConfigurer.java | 6 +
.../cassandra/CassandraEndpointUriFactory.java | 3 +-
.../org/apache/camel/component/cassandra/cql.json | 37 ++--
.../component/cassandra/CassandraConsumer.java | 10 +-
.../component/cassandra/CassandraEndpoint.java | 25 ++-
.../component/cassandra/CassandraProducer.java | 22 ++-
.../CassandraComponentRequestTimeoutIT.java | 200 +++++++++++++++++++++
.../dsl/CassandraEndpointBuilderFactory.java | 90 ++++++++++
9 files changed, 386 insertions(+), 44 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json
index 13f497864b5b..e1acd928a7c4 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json
@@ -49,23 +49,24 @@
"lazyStartProducer": { "index": 13, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
"extraTypeCodecs": { "index": 14, "kind": "parameter", "displayName":
"Extra Type Codecs", "group": "advanced", "label": "advanced", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "To use a specific comma
separated list of Extra Type codecs. Possible values are: BLOB_TO_ARRAY,
BOOLEAN_LIST_TO_ARRAY, BYTE_LIST_TO_ARRAY, SHORT_LIST_TO_ARRAY,
INT_LIST_TO_ARRAY, LONG_LIST_TO_ARRAY, FLOAT_LIST_T [...]
"loadBalancingPolicyClass": { "index": 15, "kind": "parameter",
"displayName": "Load Balancing Policy Class", "group": "advanced", "label":
"advanced", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "To use a specific LoadBalancingPolicyClass" },
- "resultSetConversionStrategy": { "index": 16, "kind": "parameter",
"displayName": "Result Set Conversion Strategy", "group": "advanced", "label":
"advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.component.cassandra.ResultSetConversionStrategy",
"deprecated": false, "autowired": false, "secret": false, "description": "To
use a custom class that implements logic for converting ResultSet into message
body ALL, ONE, LIMIT_10, LIMIT_100..." },
- "session": { "index": 17, "kind": "parameter", "displayName": "Session",
"group": "advanced", "label": "advanced", "required": false, "type": "object",
"javaType": "com.datastax.oss.driver.api.core.CqlSession", "deprecated": false,
"autowired": false, "secret": false, "description": "To use the Session
instance (you would normally not use this option)" },
- "backoffErrorThreshold": { "index": 18, "kind": "parameter",
"displayName": "Backoff Error Threshold", "group": "scheduler", "label":
"consumer,scheduler", "required": false, "type": "integer", "javaType": "int",
"deprecated": false, "autowired": false, "secret": false, "description": "The
number of subsequent error polls (failed due some error) that should happen
before the backoffMultipler should kick-in." },
- "backoffIdleThreshold": { "index": 19, "kind": "parameter", "displayName":
"Backoff Idle Threshold", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "integer", "javaType": "int", "deprecated": false,
"autowired": false, "secret": false, "description": "The number of subsequent
idle polls that should happen before the backoffMultipler should kick-in." },
- "backoffMultiplier": { "index": 20, "kind": "parameter", "displayName":
"Backoff Multiplier", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "integer", "javaType": "int", "deprecated": false,
"autowired": false, "secret": false, "description": "To let the scheduled
polling consumer backoff if there has been a number of subsequent idles\/errors
in a row. The multiplier is then the number of polls that will be skipped
before the next actual attempt is h [...]
- "delay": { "index": 21, "kind": "parameter", "displayName": "Delay",
"group": "scheduler", "label": "consumer,scheduler", "required": false, "type":
"integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 500, "description": "Milliseconds before the
next poll." },
- "greedy": { "index": 22, "kind": "parameter", "displayName": "Greedy",
"group": "scheduler", "label": "consumer,scheduler", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "If greedy is enabled,
then the ScheduledPollConsumer will run immediately again, if the previous run
polled 1 or more messages." },
- "initialDelay": { "index": 23, "kind": "parameter", "displayName":
"Initial Delay", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "integer", "javaType": "long", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": 1000, "description":
"Milliseconds before the first poll starts." },
- "repeatCount": { "index": 24, "kind": "parameter", "displayName": "Repeat
Count", "group": "scheduler", "label": "consumer,scheduler", "required": false,
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 0, "description": "Specifies a maximum limit
of number of fires. So if you set it to 1, the scheduler will only fire once.
If you set it to 5, it will only fire five times. A value of zero or negative
means fire forever." },
- "runLoggingLevel": { "index": 25, "kind": "parameter", "displayName": "Run
Logging Level", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel",
"enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated":
false, "autowired": false, "secret": false, "defaultValue": "TRACE",
"description": "The consumer logs a start\/complete log line when it polls.
This option allows you to configure the log [...]
- "scheduledExecutorService": { "index": 26, "kind": "parameter",
"displayName": "Scheduled Executor Service", "group": "scheduler", "label":
"consumer,scheduler", "required": false, "type": "object", "javaType":
"java.util.concurrent.ScheduledExecutorService", "deprecated": false,
"autowired": false, "secret": false, "description": "Allows for configuring a
custom\/shared thread pool to use for the consumer. By default each consumer
has its own single threaded thread pool." },
- "scheduler": { "index": 27, "kind": "parameter", "displayName":
"Scheduler", "group": "scheduler", "label": "consumer,scheduler", "required":
false, "type": "object", "javaType": "java.lang.Object", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "none", "description": "To
use a cron scheduler from either camel-spring or camel-quartz component. Use
value spring or quartz for built in scheduler" },
- "schedulerProperties": { "index": 28, "kind": "parameter", "displayName":
"Scheduler Properties", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "object", "javaType":
"java.util.Map<java.lang.String, java.lang.Object>", "prefix": "scheduler.",
"multiValue": true, "deprecated": false, "autowired": false, "secret": false,
"description": "To configure additional properties when using a custom
scheduler or any of the Quartz, Spring based scheduler. This i [...]
- "startScheduler": { "index": 29, "kind": "parameter", "displayName":
"Start Scheduler", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "boolean", "javaType": "boolean", "deprecated":
false, "autowired": false, "secret": false, "defaultValue": true,
"description": "Whether the scheduler should be auto started." },
- "timeUnit": { "index": 30, "kind": "parameter", "displayName": "Time
Unit", "group": "scheduler", "label": "consumer,scheduler", "required": false,
"type": "enum", "javaType": "java.util.concurrent.TimeUnit", "enum": [
"NANOSECONDS", "MICROSECONDS", "MILLISECONDS", "SECONDS", "MINUTES", "HOURS",
"DAYS" ], "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "MILLISECONDS", "description": "Time unit for initialDelay and
delay options." },
- "useFixedDelay": { "index": 31, "kind": "parameter", "displayName": "Use
Fixed Delay", "group": "scheduler", "label": "consumer,scheduler", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true, "description":
"Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in
JDK for details." },
- "password": { "index": 32, "kind": "parameter", "displayName": "Password",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Password for session authentication" },
- "username": { "index": 33, "kind": "parameter", "displayName": "Username",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Username for session authentication" }
+ "requestTimeout": { "index": 16, "kind": "parameter", "displayName":
"Request Timeout", "group": "advanced", "label": "advanced", "required": false,
"type": "duration", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "description": "Request timeout in milliseconds. The timeout
is applied to each individual query execution." },
+ "resultSetConversionStrategy": { "index": 17, "kind": "parameter",
"displayName": "Result Set Conversion Strategy", "group": "advanced", "label":
"advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.component.cassandra.ResultSetConversionStrategy",
"deprecated": false, "autowired": false, "secret": false, "description": "To
use a custom class that implements logic for converting ResultSet into message
body ALL, ONE, LIMIT_10, LIMIT_100..." },
+ "session": { "index": 18, "kind": "parameter", "displayName": "Session",
"group": "advanced", "label": "advanced", "required": false, "type": "object",
"javaType": "com.datastax.oss.driver.api.core.CqlSession", "deprecated": false,
"autowired": false, "secret": false, "description": "To use the Session
instance (you would normally not use this option)" },
+ "backoffErrorThreshold": { "index": 19, "kind": "parameter",
"displayName": "Backoff Error Threshold", "group": "scheduler", "label":
"consumer,scheduler", "required": false, "type": "integer", "javaType": "int",
"deprecated": false, "autowired": false, "secret": false, "description": "The
number of subsequent error polls (failed due some error) that should happen
before the backoffMultipler should kick-in." },
+ "backoffIdleThreshold": { "index": 20, "kind": "parameter", "displayName":
"Backoff Idle Threshold", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "integer", "javaType": "int", "deprecated": false,
"autowired": false, "secret": false, "description": "The number of subsequent
idle polls that should happen before the backoffMultipler should kick-in." },
+ "backoffMultiplier": { "index": 21, "kind": "parameter", "displayName":
"Backoff Multiplier", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "integer", "javaType": "int", "deprecated": false,
"autowired": false, "secret": false, "description": "To let the scheduled
polling consumer backoff if there has been a number of subsequent idles\/errors
in a row. The multiplier is then the number of polls that will be skipped
before the next actual attempt is h [...]
+ "delay": { "index": 22, "kind": "parameter", "displayName": "Delay",
"group": "scheduler", "label": "consumer,scheduler", "required": false, "type":
"integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 500, "description": "Milliseconds before the
next poll." },
+ "greedy": { "index": 23, "kind": "parameter", "displayName": "Greedy",
"group": "scheduler", "label": "consumer,scheduler", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "If greedy is enabled,
then the ScheduledPollConsumer will run immediately again, if the previous run
polled 1 or more messages." },
+ "initialDelay": { "index": 24, "kind": "parameter", "displayName":
"Initial Delay", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "integer", "javaType": "long", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": 1000, "description":
"Milliseconds before the first poll starts." },
+ "repeatCount": { "index": 25, "kind": "parameter", "displayName": "Repeat
Count", "group": "scheduler", "label": "consumer,scheduler", "required": false,
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 0, "description": "Specifies a maximum limit
of number of fires. So if you set it to 1, the scheduler will only fire once.
If you set it to 5, it will only fire five times. A value of zero or negative
means fire forever." },
+ "runLoggingLevel": { "index": 26, "kind": "parameter", "displayName": "Run
Logging Level", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel",
"enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated":
false, "autowired": false, "secret": false, "defaultValue": "TRACE",
"description": "The consumer logs a start\/complete log line when it polls.
This option allows you to configure the log [...]
+ "scheduledExecutorService": { "index": 27, "kind": "parameter",
"displayName": "Scheduled Executor Service", "group": "scheduler", "label":
"consumer,scheduler", "required": false, "type": "object", "javaType":
"java.util.concurrent.ScheduledExecutorService", "deprecated": false,
"autowired": false, "secret": false, "description": "Allows for configuring a
custom\/shared thread pool to use for the consumer. By default each consumer
has its own single threaded thread pool." },
+ "scheduler": { "index": 28, "kind": "parameter", "displayName":
"Scheduler", "group": "scheduler", "label": "consumer,scheduler", "required":
false, "type": "object", "javaType": "java.lang.Object", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "none", "description": "To
use a cron scheduler from either camel-spring or camel-quartz component. Use
value spring or quartz for built in scheduler" },
+ "schedulerProperties": { "index": 29, "kind": "parameter", "displayName":
"Scheduler Properties", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "object", "javaType":
"java.util.Map<java.lang.String, java.lang.Object>", "prefix": "scheduler.",
"multiValue": true, "deprecated": false, "autowired": false, "secret": false,
"description": "To configure additional properties when using a custom
scheduler or any of the Quartz, Spring based scheduler. This i [...]
+ "startScheduler": { "index": 30, "kind": "parameter", "displayName":
"Start Scheduler", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "boolean", "javaType": "boolean", "deprecated":
false, "autowired": false, "secret": false, "defaultValue": true,
"description": "Whether the scheduler should be auto started." },
+ "timeUnit": { "index": 31, "kind": "parameter", "displayName": "Time
Unit", "group": "scheduler", "label": "consumer,scheduler", "required": false,
"type": "enum", "javaType": "java.util.concurrent.TimeUnit", "enum": [
"NANOSECONDS", "MICROSECONDS", "MILLISECONDS", "SECONDS", "MINUTES", "HOURS",
"DAYS" ], "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "MILLISECONDS", "description": "Time unit for initialDelay and
delay options." },
+ "useFixedDelay": { "index": 32, "kind": "parameter", "displayName": "Use
Fixed Delay", "group": "scheduler", "label": "consumer,scheduler", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true, "description":
"Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in
JDK for details." },
+ "password": { "index": 33, "kind": "parameter", "displayName": "Password",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Password for session authentication" },
+ "username": { "index": 34, "kind": "parameter", "displayName": "Username",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Username for session authentication" }
}
}
diff --git
a/components/camel-cassandraql/src/generated/java/org/apache/camel/component/cassandra/CassandraEndpointConfigurer.java
b/components/camel-cassandraql/src/generated/java/org/apache/camel/component/cassandra/CassandraEndpointConfigurer.java
index 0f8c86a4d593..ade2be92f05a 100644
---
a/components/camel-cassandraql/src/generated/java/org/apache/camel/component/cassandra/CassandraEndpointConfigurer.java
+++
b/components/camel-cassandraql/src/generated/java/org/apache/camel/component/cassandra/CassandraEndpointConfigurer.java
@@ -56,6 +56,8 @@ public class CassandraEndpointConfigurer extends
PropertyConfigurerSupport imple
case "prepareStatements":
target.setPrepareStatements(property(camelContext, boolean.class, value));
return true;
case "repeatcount":
case "repeatCount": target.setRepeatCount(property(camelContext,
long.class, value)); return true;
+ case "requesttimeout":
+ case "requestTimeout": target.setRequestTimeout(property(camelContext,
int.class, value)); return true;
case "resultsetconversionstrategy":
case "resultSetConversionStrategy":
target.setResultSetConversionStrategy(property(camelContext,
org.apache.camel.component.cassandra.ResultSetConversionStrategy.class,
value)); return true;
case "runlogginglevel":
@@ -115,6 +117,8 @@ public class CassandraEndpointConfigurer extends
PropertyConfigurerSupport imple
case "prepareStatements": return boolean.class;
case "repeatcount":
case "repeatCount": return long.class;
+ case "requesttimeout":
+ case "requestTimeout": return int.class;
case "resultsetconversionstrategy":
case "resultSetConversionStrategy": return
org.apache.camel.component.cassandra.ResultSetConversionStrategy.class;
case "runlogginglevel":
@@ -175,6 +179,8 @@ public class CassandraEndpointConfigurer extends
PropertyConfigurerSupport imple
case "prepareStatements": return target.isPrepareStatements();
case "repeatcount":
case "repeatCount": return target.getRepeatCount();
+ case "requesttimeout":
+ case "requestTimeout": return target.getRequestTimeout();
case "resultsetconversionstrategy":
case "resultSetConversionStrategy": return
target.getResultSetConversionStrategy();
case "runlogginglevel":
diff --git
a/components/camel-cassandraql/src/generated/java/org/apache/camel/component/cassandra/CassandraEndpointUriFactory.java
b/components/camel-cassandraql/src/generated/java/org/apache/camel/component/cassandra/CassandraEndpointUriFactory.java
index 8e4b813dd3da..8cd11ed34b57 100644
---
a/components/camel-cassandraql/src/generated/java/org/apache/camel/component/cassandra/CassandraEndpointUriFactory.java
+++
b/components/camel-cassandraql/src/generated/java/org/apache/camel/component/cassandra/CassandraEndpointUriFactory.java
@@ -23,7 +23,7 @@ public class CassandraEndpointUriFactory extends
org.apache.camel.support.compon
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Map<String, String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(34);
+ Set<String> props = new HashSet<>(35);
props.add("backoffErrorThreshold");
props.add("backoffIdleThreshold");
props.add("backoffMultiplier");
@@ -47,6 +47,7 @@ public class CassandraEndpointUriFactory extends
org.apache.camel.support.compon
props.add("port");
props.add("prepareStatements");
props.add("repeatCount");
+ props.add("requestTimeout");
props.add("resultSetConversionStrategy");
props.add("runLoggingLevel");
props.add("scheduledExecutorService");
diff --git
a/components/camel-cassandraql/src/generated/resources/META-INF/org/apache/camel/component/cassandra/cql.json
b/components/camel-cassandraql/src/generated/resources/META-INF/org/apache/camel/component/cassandra/cql.json
index 13f497864b5b..e1acd928a7c4 100644
---
a/components/camel-cassandraql/src/generated/resources/META-INF/org/apache/camel/component/cassandra/cql.json
+++
b/components/camel-cassandraql/src/generated/resources/META-INF/org/apache/camel/component/cassandra/cql.json
@@ -49,23 +49,24 @@
"lazyStartProducer": { "index": 13, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
"extraTypeCodecs": { "index": 14, "kind": "parameter", "displayName":
"Extra Type Codecs", "group": "advanced", "label": "advanced", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "To use a specific comma
separated list of Extra Type codecs. Possible values are: BLOB_TO_ARRAY,
BOOLEAN_LIST_TO_ARRAY, BYTE_LIST_TO_ARRAY, SHORT_LIST_TO_ARRAY,
INT_LIST_TO_ARRAY, LONG_LIST_TO_ARRAY, FLOAT_LIST_T [...]
"loadBalancingPolicyClass": { "index": 15, "kind": "parameter",
"displayName": "Load Balancing Policy Class", "group": "advanced", "label":
"advanced", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "To use a specific LoadBalancingPolicyClass" },
- "resultSetConversionStrategy": { "index": 16, "kind": "parameter",
"displayName": "Result Set Conversion Strategy", "group": "advanced", "label":
"advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.component.cassandra.ResultSetConversionStrategy",
"deprecated": false, "autowired": false, "secret": false, "description": "To
use a custom class that implements logic for converting ResultSet into message
body ALL, ONE, LIMIT_10, LIMIT_100..." },
- "session": { "index": 17, "kind": "parameter", "displayName": "Session",
"group": "advanced", "label": "advanced", "required": false, "type": "object",
"javaType": "com.datastax.oss.driver.api.core.CqlSession", "deprecated": false,
"autowired": false, "secret": false, "description": "To use the Session
instance (you would normally not use this option)" },
- "backoffErrorThreshold": { "index": 18, "kind": "parameter",
"displayName": "Backoff Error Threshold", "group": "scheduler", "label":
"consumer,scheduler", "required": false, "type": "integer", "javaType": "int",
"deprecated": false, "autowired": false, "secret": false, "description": "The
number of subsequent error polls (failed due some error) that should happen
before the backoffMultipler should kick-in." },
- "backoffIdleThreshold": { "index": 19, "kind": "parameter", "displayName":
"Backoff Idle Threshold", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "integer", "javaType": "int", "deprecated": false,
"autowired": false, "secret": false, "description": "The number of subsequent
idle polls that should happen before the backoffMultipler should kick-in." },
- "backoffMultiplier": { "index": 20, "kind": "parameter", "displayName":
"Backoff Multiplier", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "integer", "javaType": "int", "deprecated": false,
"autowired": false, "secret": false, "description": "To let the scheduled
polling consumer backoff if there has been a number of subsequent idles\/errors
in a row. The multiplier is then the number of polls that will be skipped
before the next actual attempt is h [...]
- "delay": { "index": 21, "kind": "parameter", "displayName": "Delay",
"group": "scheduler", "label": "consumer,scheduler", "required": false, "type":
"integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 500, "description": "Milliseconds before the
next poll." },
- "greedy": { "index": 22, "kind": "parameter", "displayName": "Greedy",
"group": "scheduler", "label": "consumer,scheduler", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "If greedy is enabled,
then the ScheduledPollConsumer will run immediately again, if the previous run
polled 1 or more messages." },
- "initialDelay": { "index": 23, "kind": "parameter", "displayName":
"Initial Delay", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "integer", "javaType": "long", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": 1000, "description":
"Milliseconds before the first poll starts." },
- "repeatCount": { "index": 24, "kind": "parameter", "displayName": "Repeat
Count", "group": "scheduler", "label": "consumer,scheduler", "required": false,
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 0, "description": "Specifies a maximum limit
of number of fires. So if you set it to 1, the scheduler will only fire once.
If you set it to 5, it will only fire five times. A value of zero or negative
means fire forever." },
- "runLoggingLevel": { "index": 25, "kind": "parameter", "displayName": "Run
Logging Level", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel",
"enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated":
false, "autowired": false, "secret": false, "defaultValue": "TRACE",
"description": "The consumer logs a start\/complete log line when it polls.
This option allows you to configure the log [...]
- "scheduledExecutorService": { "index": 26, "kind": "parameter",
"displayName": "Scheduled Executor Service", "group": "scheduler", "label":
"consumer,scheduler", "required": false, "type": "object", "javaType":
"java.util.concurrent.ScheduledExecutorService", "deprecated": false,
"autowired": false, "secret": false, "description": "Allows for configuring a
custom\/shared thread pool to use for the consumer. By default each consumer
has its own single threaded thread pool." },
- "scheduler": { "index": 27, "kind": "parameter", "displayName":
"Scheduler", "group": "scheduler", "label": "consumer,scheduler", "required":
false, "type": "object", "javaType": "java.lang.Object", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "none", "description": "To
use a cron scheduler from either camel-spring or camel-quartz component. Use
value spring or quartz for built in scheduler" },
- "schedulerProperties": { "index": 28, "kind": "parameter", "displayName":
"Scheduler Properties", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "object", "javaType":
"java.util.Map<java.lang.String, java.lang.Object>", "prefix": "scheduler.",
"multiValue": true, "deprecated": false, "autowired": false, "secret": false,
"description": "To configure additional properties when using a custom
scheduler or any of the Quartz, Spring based scheduler. This i [...]
- "startScheduler": { "index": 29, "kind": "parameter", "displayName":
"Start Scheduler", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "boolean", "javaType": "boolean", "deprecated":
false, "autowired": false, "secret": false, "defaultValue": true,
"description": "Whether the scheduler should be auto started." },
- "timeUnit": { "index": 30, "kind": "parameter", "displayName": "Time
Unit", "group": "scheduler", "label": "consumer,scheduler", "required": false,
"type": "enum", "javaType": "java.util.concurrent.TimeUnit", "enum": [
"NANOSECONDS", "MICROSECONDS", "MILLISECONDS", "SECONDS", "MINUTES", "HOURS",
"DAYS" ], "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "MILLISECONDS", "description": "Time unit for initialDelay and
delay options." },
- "useFixedDelay": { "index": 31, "kind": "parameter", "displayName": "Use
Fixed Delay", "group": "scheduler", "label": "consumer,scheduler", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true, "description":
"Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in
JDK for details." },
- "password": { "index": 32, "kind": "parameter", "displayName": "Password",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Password for session authentication" },
- "username": { "index": 33, "kind": "parameter", "displayName": "Username",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Username for session authentication" }
+ "requestTimeout": { "index": 16, "kind": "parameter", "displayName":
"Request Timeout", "group": "advanced", "label": "advanced", "required": false,
"type": "duration", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "description": "Request timeout in milliseconds. The timeout
is applied to each individual query execution." },
+ "resultSetConversionStrategy": { "index": 17, "kind": "parameter",
"displayName": "Result Set Conversion Strategy", "group": "advanced", "label":
"advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.component.cassandra.ResultSetConversionStrategy",
"deprecated": false, "autowired": false, "secret": false, "description": "To
use a custom class that implements logic for converting ResultSet into message
body ALL, ONE, LIMIT_10, LIMIT_100..." },
+ "session": { "index": 18, "kind": "parameter", "displayName": "Session",
"group": "advanced", "label": "advanced", "required": false, "type": "object",
"javaType": "com.datastax.oss.driver.api.core.CqlSession", "deprecated": false,
"autowired": false, "secret": false, "description": "To use the Session
instance (you would normally not use this option)" },
+ "backoffErrorThreshold": { "index": 19, "kind": "parameter",
"displayName": "Backoff Error Threshold", "group": "scheduler", "label":
"consumer,scheduler", "required": false, "type": "integer", "javaType": "int",
"deprecated": false, "autowired": false, "secret": false, "description": "The
number of subsequent error polls (failed due some error) that should happen
before the backoffMultipler should kick-in." },
+ "backoffIdleThreshold": { "index": 20, "kind": "parameter", "displayName":
"Backoff Idle Threshold", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "integer", "javaType": "int", "deprecated": false,
"autowired": false, "secret": false, "description": "The number of subsequent
idle polls that should happen before the backoffMultipler should kick-in." },
+ "backoffMultiplier": { "index": 21, "kind": "parameter", "displayName":
"Backoff Multiplier", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "integer", "javaType": "int", "deprecated": false,
"autowired": false, "secret": false, "description": "To let the scheduled
polling consumer backoff if there has been a number of subsequent idles\/errors
in a row. The multiplier is then the number of polls that will be skipped
before the next actual attempt is h [...]
+ "delay": { "index": 22, "kind": "parameter", "displayName": "Delay",
"group": "scheduler", "label": "consumer,scheduler", "required": false, "type":
"integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 500, "description": "Milliseconds before the
next poll." },
+ "greedy": { "index": 23, "kind": "parameter", "displayName": "Greedy",
"group": "scheduler", "label": "consumer,scheduler", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "If greedy is enabled,
then the ScheduledPollConsumer will run immediately again, if the previous run
polled 1 or more messages." },
+ "initialDelay": { "index": 24, "kind": "parameter", "displayName":
"Initial Delay", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "integer", "javaType": "long", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": 1000, "description":
"Milliseconds before the first poll starts." },
+ "repeatCount": { "index": 25, "kind": "parameter", "displayName": "Repeat
Count", "group": "scheduler", "label": "consumer,scheduler", "required": false,
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 0, "description": "Specifies a maximum limit
of number of fires. So if you set it to 1, the scheduler will only fire once.
If you set it to 5, it will only fire five times. A value of zero or negative
means fire forever." },
+ "runLoggingLevel": { "index": 26, "kind": "parameter", "displayName": "Run
Logging Level", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel",
"enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated":
false, "autowired": false, "secret": false, "defaultValue": "TRACE",
"description": "The consumer logs a start\/complete log line when it polls.
This option allows you to configure the log [...]
+ "scheduledExecutorService": { "index": 27, "kind": "parameter",
"displayName": "Scheduled Executor Service", "group": "scheduler", "label":
"consumer,scheduler", "required": false, "type": "object", "javaType":
"java.util.concurrent.ScheduledExecutorService", "deprecated": false,
"autowired": false, "secret": false, "description": "Allows for configuring a
custom\/shared thread pool to use for the consumer. By default each consumer
has its own single threaded thread pool." },
+ "scheduler": { "index": 28, "kind": "parameter", "displayName":
"Scheduler", "group": "scheduler", "label": "consumer,scheduler", "required":
false, "type": "object", "javaType": "java.lang.Object", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "none", "description": "To
use a cron scheduler from either camel-spring or camel-quartz component. Use
value spring or quartz for built in scheduler" },
+ "schedulerProperties": { "index": 29, "kind": "parameter", "displayName":
"Scheduler Properties", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "object", "javaType":
"java.util.Map<java.lang.String, java.lang.Object>", "prefix": "scheduler.",
"multiValue": true, "deprecated": false, "autowired": false, "secret": false,
"description": "To configure additional properties when using a custom
scheduler or any of the Quartz, Spring based scheduler. This i [...]
+ "startScheduler": { "index": 30, "kind": "parameter", "displayName":
"Start Scheduler", "group": "scheduler", "label": "consumer,scheduler",
"required": false, "type": "boolean", "javaType": "boolean", "deprecated":
false, "autowired": false, "secret": false, "defaultValue": true,
"description": "Whether the scheduler should be auto started." },
+ "timeUnit": { "index": 31, "kind": "parameter", "displayName": "Time
Unit", "group": "scheduler", "label": "consumer,scheduler", "required": false,
"type": "enum", "javaType": "java.util.concurrent.TimeUnit", "enum": [
"NANOSECONDS", "MICROSECONDS", "MILLISECONDS", "SECONDS", "MINUTES", "HOURS",
"DAYS" ], "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "MILLISECONDS", "description": "Time unit for initialDelay and
delay options." },
+ "useFixedDelay": { "index": 32, "kind": "parameter", "displayName": "Use
Fixed Delay", "group": "scheduler", "label": "consumer,scheduler", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true, "description":
"Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in
JDK for details." },
+ "password": { "index": 33, "kind": "parameter", "displayName": "Password",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Password for session authentication" },
+ "username": { "index": 34, "kind": "parameter", "displayName": "Username",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Username for session authentication" }
}
}
diff --git
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
index e8216140ce41..8cd2a0c45af1 100644
---
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
+++
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
@@ -16,9 +16,12 @@
*/
package org.apache.camel.component.cassandra;
+import java.time.Duration;
+
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
@@ -58,7 +61,12 @@ public class CassandraConsumer extends ScheduledPollConsumer
implements ResumeAw
if (isPrepareStatements()) {
resultSet = session.execute(preparedStatement.bind());
} else {
- resultSet = session.execute(getEndpoint().getCql());
+ SimpleStatement statement =
SimpleStatement.newInstance(getEndpoint().getCql());
+ int requestTimeout = getEndpoint().getRequestTimeout();
+ if (requestTimeout > 0) {
+ statement =
statement.setTimeout(Duration.ofMillis(requestTimeout));
+ }
+ resultSet = session.execute(statement);
}
// Create message from ResultSet
diff --git
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
index ba94a025e5ab..555c1831532c 100644
---
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
+++
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.cassandra;
import java.net.InetSocketAddress;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
@@ -29,6 +30,7 @@ import
com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder;
import org.apache.camel.Category;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
@@ -87,6 +89,9 @@ public class CassandraEndpoint extends ScheduledPollEndpoint
implements Endpoint
private ResultSetConversionStrategy resultSetConversionStrategy =
ResultSetConversionStrategies.all();
@UriParam(label = "advanced")
private String extraTypeCodecs;
+ @UriParam(label = "advanced", javaType = "java.time.Duration",
+ description = "Request timeout in milliseconds. The timeout is
applied to each individual query execution.")
+ private int requestTimeout;
public CassandraEndpoint(String endpointUri, Component component) {
super(endpointUri, component);
@@ -212,9 +217,12 @@ public class CassandraEndpoint extends
ScheduledPollEndpoint implements Endpoint
* Create and configure a Prepared CQL statement
*/
protected PreparedStatement prepareStatement(String cql) {
- SimpleStatement statement = SimpleStatement.builder(cql)
- .setConsistencyLevel(consistencyLevel).build();
- return getSessionHolder().getSession().prepare(statement);
+ SimpleStatementBuilder builder = SimpleStatement.builder(cql)
+ .setConsistencyLevel(consistencyLevel);
+ if (requestTimeout > 0) {
+ builder.setTimeout(Duration.ofMillis(requestTimeout));
+ }
+ return getSessionHolder().getSession().prepare(builder.build());
}
/**
@@ -415,4 +423,15 @@ public class CassandraEndpoint extends
ScheduledPollEndpoint implements Endpoint
public void setExtraTypeCodecs(String extraTypeCodecs) {
this.extraTypeCodecs = extraTypeCodecs;
}
+
+ public int getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ /**
+ * Request timeout in milliseconds. The timeout is applied to each
individual query execution.
+ */
+ public void setRequestTimeout(int requestTimeout) {
+ this.requestTimeout = requestTimeout;
+ }
}
diff --git
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java
index 12e508d7094b..39582ecbec55 100644
---
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java
+++
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.cassandra;
+import java.time.Duration;
import java.util.Collection;
import com.datastax.oss.driver.api.core.CqlSession;
@@ -153,16 +154,31 @@ public class CassandraProducer extends DefaultProducer {
throw new IllegalArgumentException("Invalid " +
CassandraConstants.CQL_QUERY + " header");
}
if (statement != null) {
+ statement = applyRequestTimeout(statement);
resultSet = session.execute(statement);
} else if (isEmpty(cqlParams)) {
- resultSet = session.execute(cql);
+ SimpleStatement simpleStatement = SimpleStatement.newInstance(cql);
+ simpleStatement = applyRequestTimeout(simpleStatement);
+ resultSet = session.execute(simpleStatement);
} else {
- resultSet = session.execute(
-
SimpleStatement.builder(cql).addPositionalValues(cqlParams).build());
+ SimpleStatement simpleStatement =
SimpleStatement.builder(cql).addPositionalValues(cqlParams).build();
+ simpleStatement = applyRequestTimeout(simpleStatement);
+ resultSet = session.execute(simpleStatement);
}
return resultSet;
}
+ /**
+ * Apply request timeout to a SimpleStatement if configured
+ */
+ private SimpleStatement applyRequestTimeout(SimpleStatement statement) {
+ int requestTimeout = getEndpoint().getRequestTimeout();
+ if (requestTimeout > 0) {
+ return statement.setTimeout(Duration.ofMillis(requestTimeout));
+ }
+ return statement;
+ }
+
@Override
public void process(Exchange exchange) throws Exception {
// copy the header of in message to the out message
diff --git
a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentRequestTimeoutIT.java
b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentRequestTimeoutIT.java
new file mode 100644
index 000000000000..459ef525e366
--- /dev/null
+++
b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentRequestTimeoutIT.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.cassandra.integration;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cassandra.CassandraEndpoint;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for the requestTimeout parameter in the Cassandra
component.
+ */
+public class CassandraComponentRequestTimeoutIT extends BaseCassandra {
+
+ static final String CQL_INSERT = "insert into camel_user(login,
first_name, last_name) values (?, ?, ?)";
+ static final String CQL_SELECT = "select login, first_name, last_name from
camel_user";
+
+ @Produce("direct:inputWithTimeout")
+ ProducerTemplate producerWithTimeout;
+
+ @Produce("direct:inputWithTimeoutUnprepared")
+ ProducerTemplate producerWithTimeoutUnprepared;
+
+ @Produce("direct:inputWithoutTimeout")
+ ProducerTemplate producerWithoutTimeout;
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ // Producer routes with different timeout configurations
+ from("direct:inputWithTimeout")
+ .toF("cql://%s/%s?cql=%s&requestTimeout=30000",
getUrl(), KEYSPACE_NAME, CQL_INSERT);
+
+ from("direct:inputWithTimeoutUnprepared")
+
.toF("cql://%s/%s?cql=%s&requestTimeout=30000&prepareStatements=false",
getUrl(), KEYSPACE_NAME,
+ CQL_INSERT);
+
+ from("direct:inputWithoutTimeout")
+ .toF("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME,
CQL_INSERT);
+
+ // Consumer routes with different timeout configurations
+ fromF("cql://%s/%s?cql=%s&requestTimeout=30000", getUrl(),
KEYSPACE_NAME, CQL_SELECT)
+ .to("mock:resultWithTimeout");
+
+
fromF("cql://%s/%s?cql=%s&requestTimeout=30000&prepareStatements=false",
getUrl(), KEYSPACE_NAME, CQL_SELECT)
+ .to("mock:resultWithTimeoutUnprepared");
+ }
+ };
+ }
+
+ /**
+ * Test that the requestTimeout parameter is correctly set on the endpoint.
+ */
+ @Test
+ public void testEndpointRequestTimeoutConfiguration() {
+ CassandraEndpoint endpointWithTimeout = getMandatoryEndpoint(
+ String.format("cql://%s/%s?cql=%s&requestTimeout=30000",
getUrl(), KEYSPACE_NAME, CQL_INSERT),
+ CassandraEndpoint.class);
+ assertEquals(30000, endpointWithTimeout.getRequestTimeout());
+
+ CassandraEndpoint endpointWithoutTimeout = getMandatoryEndpoint(
+ String.format("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME,
CQL_INSERT),
+ CassandraEndpoint.class);
+ assertEquals(0, endpointWithoutTimeout.getRequestTimeout());
+ }
+
+ /**
+ * Test producer with requestTimeout using prepared statements.
+ */
+ @Test
+ public void testProducerWithTimeoutPreparedStatements() {
+ producerWithTimeout.requestBody(Arrays.asList("timeout_user1",
"Timeout", "User1"));
+
+ ResultSet resultSet = getSession()
+ .execute("select login, first_name, last_name from camel_user
where login = ?", "timeout_user1");
+ Row row = resultSet.one();
+ assertNotNull(row);
+ assertEquals("Timeout", row.getString("first_name"));
+ assertEquals("User1", row.getString("last_name"));
+ }
+
+ /**
+ * Test producer with requestTimeout using unprepared statements.
+ */
+ @Test
+ public void testProducerWithTimeoutUnpreparedStatements() {
+
producerWithTimeoutUnprepared.requestBody(Arrays.asList("timeout_user2",
"Timeout", "User2"));
+
+ ResultSet resultSet = getSession()
+ .execute("select login, first_name, last_name from camel_user
where login = ?", "timeout_user2");
+ Row row = resultSet.one();
+ assertNotNull(row);
+ assertEquals("Timeout", row.getString("first_name"));
+ assertEquals("User2", row.getString("last_name"));
+ }
+
+ /**
+ * Test producer without requestTimeout (uses driver default).
+ */
+ @Test
+ public void testProducerWithoutTimeout() {
+ producerWithoutTimeout.requestBody(Arrays.asList("no_timeout_user",
"NoTimeout", "User"));
+
+ ResultSet resultSet = getSession()
+ .execute("select login, first_name, last_name from camel_user
where login = ?", "no_timeout_user");
+ Row row = resultSet.one();
+ assertNotNull(row);
+ assertEquals("NoTimeout", row.getString("first_name"));
+ assertEquals("User", row.getString("last_name"));
+ }
+
+ /**
+ * Test consumer with requestTimeout using prepared statements.
+ */
+ @Test
+ public void testConsumerWithTimeoutPreparedStatements() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:resultWithTimeout");
+ mock.expectedMinimumMessageCount(1);
+ mock.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) {
+ Object body = exchange.getIn().getBody();
+ assertTrue(body instanceof List);
+ }
+ });
+ mock.await(1, TimeUnit.SECONDS);
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
+ /**
+ * Test consumer with requestTimeout using unprepared statements.
+ */
+ @Test
+ public void testConsumerWithTimeoutUnpreparedStatements() throws Exception
{
+ MockEndpoint mock =
getMockEndpoint("mock:resultWithTimeoutUnprepared");
+ mock.expectedMinimumMessageCount(1);
+ mock.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) {
+ Object body = exchange.getIn().getBody();
+ assertTrue(body instanceof List);
+ }
+ });
+ mock.await(1, TimeUnit.SECONDS);
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
+ /**
+ * Test that different timeout values can be configured.
+ */
+ @Test
+ public void testDifferentTimeoutValues() {
+ // Test with 1 second timeout
+ CassandraEndpoint endpoint1s = getMandatoryEndpoint(
+ String.format("cql://%s/%s?cql=%s&requestTimeout=1000",
getUrl(), KEYSPACE_NAME, CQL_SELECT),
+ CassandraEndpoint.class);
+ assertEquals(1000, endpoint1s.getRequestTimeout());
+
+ // Test with 60 second timeout
+ CassandraEndpoint endpoint60s = getMandatoryEndpoint(
+ String.format("cql://%s/%s?cql=%s&requestTimeout=60000",
getUrl(), KEYSPACE_NAME, CQL_SELECT),
+ CassandraEndpoint.class);
+ assertEquals(60000, endpoint60s.getRequestTimeout());
+
+ // Test with 5 minute timeout
+ CassandraEndpoint endpoint5m = getMandatoryEndpoint(
+ String.format("cql://%s/%s?cql=%s&requestTimeout=300000",
getUrl(), KEYSPACE_NAME, CQL_SELECT),
+ CassandraEndpoint.class);
+ assertEquals(300000, endpoint5m.getRequestTimeout());
+ }
+}
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CassandraEndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CassandraEndpointBuilderFactory.java
index da4906164824..f7c6e7145fe6 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CassandraEndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CassandraEndpointBuilderFactory.java
@@ -834,6 +834,36 @@ public interface CassandraEndpointBuilderFactory {
doSetProperty("loadBalancingPolicyClass",
loadBalancingPolicyClass);
return this;
}
+ /**
+ * Request timeout in milliseconds. The timeout is applied to each
+ * individual query execution.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Group: advanced
+ *
+ * @param requestTimeout the value to set
+ * @return the dsl builder
+ */
+ default AdvancedCassandraEndpointConsumerBuilder requestTimeout(int
requestTimeout) {
+ doSetProperty("requestTimeout", requestTimeout);
+ return this;
+ }
+ /**
+ * Request timeout in milliseconds. The timeout is applied to each
+ * individual query execution.
+ *
+ * The option will be converted to a <code>int</code> type.
+ *
+ * Group: advanced
+ *
+ * @param requestTimeout the value to set
+ * @return the dsl builder
+ */
+ default AdvancedCassandraEndpointConsumerBuilder requestTimeout(String
requestTimeout) {
+ doSetProperty("requestTimeout", requestTimeout);
+ return this;
+ }
/**
* To use a custom class that implements logic for converting ResultSet
* into message body ALL, ONE, LIMIT_10, LIMIT_100...
@@ -1101,6 +1131,36 @@ public interface CassandraEndpointBuilderFactory {
doSetProperty("loadBalancingPolicyClass",
loadBalancingPolicyClass);
return this;
}
+ /**
+ * Request timeout in milliseconds. The timeout is applied to each
+ * individual query execution.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Group: advanced
+ *
+ * @param requestTimeout the value to set
+ * @return the dsl builder
+ */
+ default AdvancedCassandraEndpointProducerBuilder requestTimeout(int
requestTimeout) {
+ doSetProperty("requestTimeout", requestTimeout);
+ return this;
+ }
+ /**
+ * Request timeout in milliseconds. The timeout is applied to each
+ * individual query execution.
+ *
+ * The option will be converted to a <code>int</code> type.
+ *
+ * Group: advanced
+ *
+ * @param requestTimeout the value to set
+ * @return the dsl builder
+ */
+ default AdvancedCassandraEndpointProducerBuilder requestTimeout(String
requestTimeout) {
+ doSetProperty("requestTimeout", requestTimeout);
+ return this;
+ }
/**
* To use a custom class that implements logic for converting ResultSet
* into message body ALL, ONE, LIMIT_10, LIMIT_100...
@@ -1326,6 +1386,36 @@ public interface CassandraEndpointBuilderFactory {
doSetProperty("loadBalancingPolicyClass",
loadBalancingPolicyClass);
return this;
}
+ /**
+ * Request timeout in milliseconds. The timeout is applied to each
+ * individual query execution.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Group: advanced
+ *
+ * @param requestTimeout the value to set
+ * @return the dsl builder
+ */
+ default AdvancedCassandraEndpointBuilder requestTimeout(int
requestTimeout) {
+ doSetProperty("requestTimeout", requestTimeout);
+ return this;
+ }
+ /**
+ * Request timeout in milliseconds. The timeout is applied to each
+ * individual query execution.
+ *
+ * The option will be converted to a <code>int</code> type.
+ *
+ * Group: advanced
+ *
+ * @param requestTimeout the value to set
+ * @return the dsl builder
+ */
+ default AdvancedCassandraEndpointBuilder requestTimeout(String
requestTimeout) {
+ doSetProperty("requestTimeout", requestTimeout);
+ return this;
+ }
/**
* To use a custom class that implements logic for converting ResultSet
* into message body ALL, ONE, LIMIT_10, LIMIT_100...