This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch CAMEL-22863 in repository https://gitbox.apache.org/repos/asf/camel.git
commit c92b5e558e6280eacd0178a2efbd2125ebf2d1ff Author: Andrea Cosentino <[email protected]> AuthorDate: Fri Jan 16 13:14:41 2026 +0100 CAMEL-22863 - Camel-CassandraQL: Add requestTimeout parameter to camel-cassandraql component Signed-off-by: Andrea Cosentino <[email protected]> --- .../org/apache/camel/catalog/components/cql.json | 37 ++-- .../cassandra/CassandraEndpointConfigurer.java | 6 + .../cassandra/CassandraEndpointUriFactory.java | 3 +- .../org/apache/camel/component/cassandra/cql.json | 37 ++-- .../component/cassandra/CassandraConsumer.java | 10 +- .../component/cassandra/CassandraEndpoint.java | 25 ++- .../component/cassandra/CassandraProducer.java | 22 ++- .../CassandraComponentRequestTimeoutIT.java | 201 +++++++++++++++++++++ .../dsl/CassandraEndpointBuilderFactory.java | 90 +++++++++ 9 files changed, 387 insertions(+), 44 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json index 13f497864b5b..3128a5e9f17b 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json @@ -49,23 +49,24 @@ "lazyStartProducer": { "index": 13, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] "extraTypeCodecs": { "index": 14, "kind": "parameter", "displayName": "Extra Type Codecs", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "To use a specific comma separated list of Extra Type codecs. Possible values are: BLOB_TO_ARRAY, BOOLEAN_LIST_TO_ARRAY, BYTE_LIST_TO_ARRAY, SHORT_LIST_TO_ARRAY, INT_LIST_TO_ARRAY, LONG_LIST_TO_ARRAY, FLOAT_LIST_T [...] "loadBalancingPolicyClass": { "index": 15, "kind": "parameter", "displayName": "Load Balancing Policy Class", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "To use a specific LoadBalancingPolicyClass" }, - "resultSetConversionStrategy": { "index": 16, "kind": "parameter", "displayName": "Result Set Conversion Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.cassandra.ResultSetConversionStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom class that implements logic for converting ResultSet into message body ALL, ONE, LIMIT_10, LIMIT_100..." }, - "session": { "index": 17, "kind": "parameter", "displayName": "Session", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "com.datastax.oss.driver.api.core.CqlSession", "deprecated": false, "autowired": false, "secret": false, "description": "To use the Session instance (you would normally not use this option)" }, - "backoffErrorThreshold": { "index": 18, "kind": "parameter", "displayName": "Backoff Error Threshold", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in." }, - "backoffIdleThreshold": { "index": 19, "kind": "parameter", "displayName": "Backoff Idle Threshold", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "The number of subsequent idle polls that should happen before the backoffMultipler should kick-in." }, - "backoffMultiplier": { "index": 20, "kind": "parameter", "displayName": "Backoff Multiplier", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "To let the scheduled polling consumer backoff if there has been a number of subsequent idles\/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is h [...] - "delay": { "index": 21, "kind": "parameter", "displayName": "Delay", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 500, "description": "Milliseconds before the next poll." }, - "greedy": { "index": 22, "kind": "parameter", "displayName": "Greedy", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages." }, - "initialDelay": { "index": 23, "kind": "parameter", "displayName": "Initial Delay", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "description": "Milliseconds before the first poll starts." }, - "repeatCount": { "index": 24, "kind": "parameter", "displayName": "Repeat Count", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 0, "description": "Specifies a maximum limit of number of fires. So if you set it to 1, the scheduler will only fire once. If you set it to 5, it will only fire five times. A value of zero or negative means fire forever." }, - "runLoggingLevel": { "index": 25, "kind": "parameter", "displayName": "Run Logging Level", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "TRACE", "description": "The consumer logs a start\/complete log line when it polls. This option allows you to configure the log [...] - "scheduledExecutorService": { "index": 26, "kind": "parameter", "displayName": "Scheduled Executor Service", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "object", "javaType": "java.util.concurrent.ScheduledExecutorService", "deprecated": false, "autowired": false, "secret": false, "description": "Allows for configuring a custom\/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool." }, - "scheduler": { "index": 27, "kind": "parameter", "displayName": "Scheduler", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "object", "javaType": "java.lang.Object", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "description": "To use a cron scheduler from either camel-spring or camel-quartz component. Use value spring or quartz for built in scheduler" }, - "schedulerProperties": { "index": 28, "kind": "parameter", "displayName": "Scheduler Properties", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "scheduler.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "description": "To configure additional properties when using a custom scheduler or any of the Quartz, Spring based scheduler. This i [...] - "startScheduler": { "index": 29, "kind": "parameter", "displayName": "Start Scheduler", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether the scheduler should be auto started." }, - "timeUnit": { "index": 30, "kind": "parameter", "displayName": "Time Unit", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "enum", "javaType": "java.util.concurrent.TimeUnit", "enum": [ "NANOSECONDS", "MICROSECONDS", "MILLISECONDS", "SECONDS", "MINUTES", "HOURS", "DAYS" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "MILLISECONDS", "description": "Time unit for initialDelay and delay options." }, - "useFixedDelay": { "index": 31, "kind": "parameter", "displayName": "Use Fixed Delay", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details." }, - "password": { "index": 32, "kind": "parameter", "displayName": "Password", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Password for session authentication" }, - "username": { "index": 33, "kind": "parameter", "displayName": "Username", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Username for session authentication" } + "requestTimeout": { "index": 16, "kind": "parameter", "displayName": "Request Timeout", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "description": "Request timeout in milliseconds. The timeout is applied to each individual query execution." }, + "resultSetConversionStrategy": { "index": 17, "kind": "parameter", "displayName": "Result Set Conversion Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.cassandra.ResultSetConversionStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom class that implements logic for converting ResultSet into message body ALL, ONE, LIMIT_10, LIMIT_100..." }, + "session": { "index": 18, "kind": "parameter", "displayName": "Session", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "com.datastax.oss.driver.api.core.CqlSession", "deprecated": false, "autowired": false, "secret": false, "description": "To use the Session instance (you would normally not use this option)" }, + "backoffErrorThreshold": { "index": 19, "kind": "parameter", "displayName": "Backoff Error Threshold", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in." }, + "backoffIdleThreshold": { "index": 20, "kind": "parameter", "displayName": "Backoff Idle Threshold", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "The number of subsequent idle polls that should happen before the backoffMultipler should kick-in." }, + "backoffMultiplier": { "index": 21, "kind": "parameter", "displayName": "Backoff Multiplier", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "To let the scheduled polling consumer backoff if there has been a number of subsequent idles\/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is h [...] + "delay": { "index": 22, "kind": "parameter", "displayName": "Delay", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 500, "description": "Milliseconds before the next poll." }, + "greedy": { "index": 23, "kind": "parameter", "displayName": "Greedy", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages." }, + "initialDelay": { "index": 24, "kind": "parameter", "displayName": "Initial Delay", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "description": "Milliseconds before the first poll starts." }, + "repeatCount": { "index": 25, "kind": "parameter", "displayName": "Repeat Count", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 0, "description": "Specifies a maximum limit of number of fires. So if you set it to 1, the scheduler will only fire once. If you set it to 5, it will only fire five times. A value of zero or negative means fire forever." }, + "runLoggingLevel": { "index": 26, "kind": "parameter", "displayName": "Run Logging Level", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "TRACE", "description": "The consumer logs a start\/complete log line when it polls. This option allows you to configure the log [...] + "scheduledExecutorService": { "index": 27, "kind": "parameter", "displayName": "Scheduled Executor Service", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "object", "javaType": "java.util.concurrent.ScheduledExecutorService", "deprecated": false, "autowired": false, "secret": false, "description": "Allows for configuring a custom\/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool." }, + "scheduler": { "index": 28, "kind": "parameter", "displayName": "Scheduler", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "object", "javaType": "java.lang.Object", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "description": "To use a cron scheduler from either camel-spring or camel-quartz component. Use value spring or quartz for built in scheduler" }, + "schedulerProperties": { "index": 29, "kind": "parameter", "displayName": "Scheduler Properties", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "scheduler.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "description": "To configure additional properties when using a custom scheduler or any of the Quartz, Spring based scheduler. This i [...] + "startScheduler": { "index": 30, "kind": "parameter", "displayName": "Start Scheduler", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether the scheduler should be auto started." }, + "timeUnit": { "index": 31, "kind": "parameter", "displayName": "Time Unit", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "enum", "javaType": "java.util.concurrent.TimeUnit", "enum": [ "NANOSECONDS", "MICROSECONDS", "MILLISECONDS", "SECONDS", "MINUTES", "HOURS", "DAYS" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "MILLISECONDS", "description": "Time unit for initialDelay and delay options." }, + "useFixedDelay": { "index": 32, "kind": "parameter", "displayName": "Use Fixed Delay", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details." }, + "password": { "index": 33, "kind": "parameter", "displayName": "Password", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Password for session authentication" }, + "username": { "index": 34, "kind": "parameter", "displayName": "Username", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Username for session authentication" } } } diff --git a/components/camel-cassandraql/src/generated/java/org/apache/camel/component/cassandra/CassandraEndpointConfigurer.java b/components/camel-cassandraql/src/generated/java/org/apache/camel/component/cassandra/CassandraEndpointConfigurer.java index 0f8c86a4d593..2c8ac9dd06e2 100644 --- a/components/camel-cassandraql/src/generated/java/org/apache/camel/component/cassandra/CassandraEndpointConfigurer.java +++ b/components/camel-cassandraql/src/generated/java/org/apache/camel/component/cassandra/CassandraEndpointConfigurer.java @@ -56,6 +56,8 @@ public class CassandraEndpointConfigurer extends PropertyConfigurerSupport imple case "prepareStatements": target.setPrepareStatements(property(camelContext, boolean.class, value)); return true; case "repeatcount": case "repeatCount": target.setRepeatCount(property(camelContext, long.class, value)); return true; + case "requesttimeout": + case "requestTimeout": target.setRequestTimeout(property(camelContext, java.lang.Long.class, value)); return true; case "resultsetconversionstrategy": case "resultSetConversionStrategy": target.setResultSetConversionStrategy(property(camelContext, org.apache.camel.component.cassandra.ResultSetConversionStrategy.class, value)); return true; case "runlogginglevel": @@ -115,6 +117,8 @@ public class CassandraEndpointConfigurer extends PropertyConfigurerSupport imple case "prepareStatements": return boolean.class; case "repeatcount": case "repeatCount": return long.class; + case "requesttimeout": + case "requestTimeout": return java.lang.Long.class; case "resultsetconversionstrategy": case "resultSetConversionStrategy": return org.apache.camel.component.cassandra.ResultSetConversionStrategy.class; case "runlogginglevel": @@ -175,6 +179,8 @@ public class CassandraEndpointConfigurer extends PropertyConfigurerSupport imple case "prepareStatements": return target.isPrepareStatements(); case "repeatcount": case "repeatCount": return target.getRepeatCount(); + case "requesttimeout": + case "requestTimeout": return target.getRequestTimeout(); case "resultsetconversionstrategy": case "resultSetConversionStrategy": return target.getResultSetConversionStrategy(); case "runlogginglevel": diff --git a/components/camel-cassandraql/src/generated/java/org/apache/camel/component/cassandra/CassandraEndpointUriFactory.java b/components/camel-cassandraql/src/generated/java/org/apache/camel/component/cassandra/CassandraEndpointUriFactory.java index 8e4b813dd3da..8cd11ed34b57 100644 --- a/components/camel-cassandraql/src/generated/java/org/apache/camel/component/cassandra/CassandraEndpointUriFactory.java +++ b/components/camel-cassandraql/src/generated/java/org/apache/camel/component/cassandra/CassandraEndpointUriFactory.java @@ -23,7 +23,7 @@ public class CassandraEndpointUriFactory extends org.apache.camel.support.compon private static final Set<String> SECRET_PROPERTY_NAMES; private static final Map<String, String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(34); + Set<String> props = new HashSet<>(35); props.add("backoffErrorThreshold"); props.add("backoffIdleThreshold"); props.add("backoffMultiplier"); @@ -47,6 +47,7 @@ public class CassandraEndpointUriFactory extends org.apache.camel.support.compon props.add("port"); props.add("prepareStatements"); props.add("repeatCount"); + props.add("requestTimeout"); props.add("resultSetConversionStrategy"); props.add("runLoggingLevel"); props.add("scheduledExecutorService"); diff --git a/components/camel-cassandraql/src/generated/resources/META-INF/org/apache/camel/component/cassandra/cql.json b/components/camel-cassandraql/src/generated/resources/META-INF/org/apache/camel/component/cassandra/cql.json index 13f497864b5b..3128a5e9f17b 100644 --- a/components/camel-cassandraql/src/generated/resources/META-INF/org/apache/camel/component/cassandra/cql.json +++ b/components/camel-cassandraql/src/generated/resources/META-INF/org/apache/camel/component/cassandra/cql.json @@ -49,23 +49,24 @@ "lazyStartProducer": { "index": 13, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] "extraTypeCodecs": { "index": 14, "kind": "parameter", "displayName": "Extra Type Codecs", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "To use a specific comma separated list of Extra Type codecs. Possible values are: BLOB_TO_ARRAY, BOOLEAN_LIST_TO_ARRAY, BYTE_LIST_TO_ARRAY, SHORT_LIST_TO_ARRAY, INT_LIST_TO_ARRAY, LONG_LIST_TO_ARRAY, FLOAT_LIST_T [...] "loadBalancingPolicyClass": { "index": 15, "kind": "parameter", "displayName": "Load Balancing Policy Class", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "To use a specific LoadBalancingPolicyClass" }, - "resultSetConversionStrategy": { "index": 16, "kind": "parameter", "displayName": "Result Set Conversion Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.cassandra.ResultSetConversionStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom class that implements logic for converting ResultSet into message body ALL, ONE, LIMIT_10, LIMIT_100..." }, - "session": { "index": 17, "kind": "parameter", "displayName": "Session", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "com.datastax.oss.driver.api.core.CqlSession", "deprecated": false, "autowired": false, "secret": false, "description": "To use the Session instance (you would normally not use this option)" }, - "backoffErrorThreshold": { "index": 18, "kind": "parameter", "displayName": "Backoff Error Threshold", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in." }, - "backoffIdleThreshold": { "index": 19, "kind": "parameter", "displayName": "Backoff Idle Threshold", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "The number of subsequent idle polls that should happen before the backoffMultipler should kick-in." }, - "backoffMultiplier": { "index": 20, "kind": "parameter", "displayName": "Backoff Multiplier", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "To let the scheduled polling consumer backoff if there has been a number of subsequent idles\/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is h [...] - "delay": { "index": 21, "kind": "parameter", "displayName": "Delay", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 500, "description": "Milliseconds before the next poll." }, - "greedy": { "index": 22, "kind": "parameter", "displayName": "Greedy", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages." }, - "initialDelay": { "index": 23, "kind": "parameter", "displayName": "Initial Delay", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "description": "Milliseconds before the first poll starts." }, - "repeatCount": { "index": 24, "kind": "parameter", "displayName": "Repeat Count", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 0, "description": "Specifies a maximum limit of number of fires. So if you set it to 1, the scheduler will only fire once. If you set it to 5, it will only fire five times. A value of zero or negative means fire forever." }, - "runLoggingLevel": { "index": 25, "kind": "parameter", "displayName": "Run Logging Level", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "TRACE", "description": "The consumer logs a start\/complete log line when it polls. This option allows you to configure the log [...] - "scheduledExecutorService": { "index": 26, "kind": "parameter", "displayName": "Scheduled Executor Service", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "object", "javaType": "java.util.concurrent.ScheduledExecutorService", "deprecated": false, "autowired": false, "secret": false, "description": "Allows for configuring a custom\/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool." }, - "scheduler": { "index": 27, "kind": "parameter", "displayName": "Scheduler", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "object", "javaType": "java.lang.Object", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "description": "To use a cron scheduler from either camel-spring or camel-quartz component. Use value spring or quartz for built in scheduler" }, - "schedulerProperties": { "index": 28, "kind": "parameter", "displayName": "Scheduler Properties", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "scheduler.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "description": "To configure additional properties when using a custom scheduler or any of the Quartz, Spring based scheduler. This i [...] - "startScheduler": { "index": 29, "kind": "parameter", "displayName": "Start Scheduler", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether the scheduler should be auto started." }, - "timeUnit": { "index": 30, "kind": "parameter", "displayName": "Time Unit", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "enum", "javaType": "java.util.concurrent.TimeUnit", "enum": [ "NANOSECONDS", "MICROSECONDS", "MILLISECONDS", "SECONDS", "MINUTES", "HOURS", "DAYS" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "MILLISECONDS", "description": "Time unit for initialDelay and delay options." }, - "useFixedDelay": { "index": 31, "kind": "parameter", "displayName": "Use Fixed Delay", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details." }, - "password": { "index": 32, "kind": "parameter", "displayName": "Password", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Password for session authentication" }, - "username": { "index": 33, "kind": "parameter", "displayName": "Username", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Username for session authentication" } + "requestTimeout": { "index": 16, "kind": "parameter", "displayName": "Request Timeout", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "description": "Request timeout in milliseconds. The timeout is applied to each individual query execution." }, + "resultSetConversionStrategy": { "index": 17, "kind": "parameter", "displayName": "Result Set Conversion Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.cassandra.ResultSetConversionStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom class that implements logic for converting ResultSet into message body ALL, ONE, LIMIT_10, LIMIT_100..." }, + "session": { "index": 18, "kind": "parameter", "displayName": "Session", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "com.datastax.oss.driver.api.core.CqlSession", "deprecated": false, "autowired": false, "secret": false, "description": "To use the Session instance (you would normally not use this option)" }, + "backoffErrorThreshold": { "index": 19, "kind": "parameter", "displayName": "Backoff Error Threshold", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in." }, + "backoffIdleThreshold": { "index": 20, "kind": "parameter", "displayName": "Backoff Idle Threshold", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "The number of subsequent idle polls that should happen before the backoffMultipler should kick-in." }, + "backoffMultiplier": { "index": 21, "kind": "parameter", "displayName": "Backoff Multiplier", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "To let the scheduled polling consumer backoff if there has been a number of subsequent idles\/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is h [...] + "delay": { "index": 22, "kind": "parameter", "displayName": "Delay", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 500, "description": "Milliseconds before the next poll." }, + "greedy": { "index": 23, "kind": "parameter", "displayName": "Greedy", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages." }, + "initialDelay": { "index": 24, "kind": "parameter", "displayName": "Initial Delay", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "description": "Milliseconds before the first poll starts." }, + "repeatCount": { "index": 25, "kind": "parameter", "displayName": "Repeat Count", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 0, "description": "Specifies a maximum limit of number of fires. So if you set it to 1, the scheduler will only fire once. If you set it to 5, it will only fire five times. A value of zero or negative means fire forever." }, + "runLoggingLevel": { "index": 26, "kind": "parameter", "displayName": "Run Logging Level", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "enum", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "TRACE", "description": "The consumer logs a start\/complete log line when it polls. This option allows you to configure the log [...] + "scheduledExecutorService": { "index": 27, "kind": "parameter", "displayName": "Scheduled Executor Service", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "object", "javaType": "java.util.concurrent.ScheduledExecutorService", "deprecated": false, "autowired": false, "secret": false, "description": "Allows for configuring a custom\/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool." }, + "scheduler": { "index": 28, "kind": "parameter", "displayName": "Scheduler", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "object", "javaType": "java.lang.Object", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "description": "To use a cron scheduler from either camel-spring or camel-quartz component. Use value spring or quartz for built in scheduler" }, + "schedulerProperties": { "index": 29, "kind": "parameter", "displayName": "Scheduler Properties", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "scheduler.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "description": "To configure additional properties when using a custom scheduler or any of the Quartz, Spring based scheduler. This i [...] + "startScheduler": { "index": 30, "kind": "parameter", "displayName": "Start Scheduler", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether the scheduler should be auto started." }, + "timeUnit": { "index": 31, "kind": "parameter", "displayName": "Time Unit", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "enum", "javaType": "java.util.concurrent.TimeUnit", "enum": [ "NANOSECONDS", "MICROSECONDS", "MILLISECONDS", "SECONDS", "MINUTES", "HOURS", "DAYS" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "MILLISECONDS", "description": "Time unit for initialDelay and delay options." }, + "useFixedDelay": { "index": 32, "kind": "parameter", "displayName": "Use Fixed Delay", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details." }, + "password": { "index": 33, "kind": "parameter", "displayName": "Password", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Password for session authentication" }, + "username": { "index": 34, "kind": "parameter", "displayName": "Username", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Username for session authentication" } } } diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java index e8216140ce41..ab5d3cd05f7a 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java @@ -16,9 +16,12 @@ */ package org.apache.camel.component.cassandra; +import java.time.Duration; + import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; @@ -58,7 +61,12 @@ public class CassandraConsumer extends ScheduledPollConsumer implements ResumeAw if (isPrepareStatements()) { resultSet = session.execute(preparedStatement.bind()); } else { - resultSet = session.execute(getEndpoint().getCql()); + SimpleStatement statement = SimpleStatement.newInstance(getEndpoint().getCql()); + Long requestTimeout = getEndpoint().getRequestTimeout(); + if (requestTimeout != null) { + statement = statement.setTimeout(Duration.ofMillis(requestTimeout)); + } + resultSet = session.execute(statement); } // Create message from ResultSet diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java index ba94a025e5ab..23c61b606a15 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraEndpoint.java @@ -17,6 +17,7 @@ package org.apache.camel.component.cassandra; import java.net.InetSocketAddress; +import java.time.Duration; import java.util.Arrays; import java.util.Map; @@ -29,6 +30,7 @@ import com.datastax.oss.driver.api.core.config.DriverConfigLoader; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder; import org.apache.camel.Category; import org.apache.camel.Component; import org.apache.camel.Consumer; @@ -87,6 +89,9 @@ public class CassandraEndpoint extends ScheduledPollEndpoint implements Endpoint private ResultSetConversionStrategy resultSetConversionStrategy = ResultSetConversionStrategies.all(); @UriParam(label = "advanced") private String extraTypeCodecs; + @UriParam(label = "advanced", + description = "Request timeout in milliseconds. The timeout is applied to each individual query execution.") + private Long requestTimeout; public CassandraEndpoint(String endpointUri, Component component) { super(endpointUri, component); @@ -212,9 +217,12 @@ public class CassandraEndpoint extends ScheduledPollEndpoint implements Endpoint * Create and configure a Prepared CQL statement */ protected PreparedStatement prepareStatement(String cql) { - SimpleStatement statement = SimpleStatement.builder(cql) - .setConsistencyLevel(consistencyLevel).build(); - return getSessionHolder().getSession().prepare(statement); + SimpleStatementBuilder builder = SimpleStatement.builder(cql) + .setConsistencyLevel(consistencyLevel); + if (requestTimeout != null) { + builder.setTimeout(Duration.ofMillis(requestTimeout)); + } + return getSessionHolder().getSession().prepare(builder.build()); } /** @@ -415,4 +423,15 @@ public class CassandraEndpoint extends ScheduledPollEndpoint implements Endpoint public void setExtraTypeCodecs(String extraTypeCodecs) { this.extraTypeCodecs = extraTypeCodecs; } + + public Long getRequestTimeout() { + return requestTimeout; + } + + /** + * Request timeout in milliseconds. The timeout is applied to each individual query execution. + */ + public void setRequestTimeout(Long requestTimeout) { + this.requestTimeout = requestTimeout; + } } diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java index 12e508d7094b..f7e979a39feb 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraProducer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.cassandra; +import java.time.Duration; import java.util.Collection; import com.datastax.oss.driver.api.core.CqlSession; @@ -153,16 +154,31 @@ public class CassandraProducer extends DefaultProducer { throw new IllegalArgumentException("Invalid " + CassandraConstants.CQL_QUERY + " header"); } if (statement != null) { + statement = applyRequestTimeout(statement); resultSet = session.execute(statement); } else if (isEmpty(cqlParams)) { - resultSet = session.execute(cql); + SimpleStatement simpleStatement = SimpleStatement.newInstance(cql); + simpleStatement = applyRequestTimeout(simpleStatement); + resultSet = session.execute(simpleStatement); } else { - resultSet = session.execute( - SimpleStatement.builder(cql).addPositionalValues(cqlParams).build()); + SimpleStatement simpleStatement = SimpleStatement.builder(cql).addPositionalValues(cqlParams).build(); + simpleStatement = applyRequestTimeout(simpleStatement); + resultSet = session.execute(simpleStatement); } return resultSet; } + /** + * Apply request timeout to a SimpleStatement if configured + */ + private SimpleStatement applyRequestTimeout(SimpleStatement statement) { + Long requestTimeout = getEndpoint().getRequestTimeout(); + if (requestTimeout != null) { + return statement.setTimeout(Duration.ofMillis(requestTimeout)); + } + return statement; + } + @Override public void process(Exchange exchange) throws Exception { // copy the header of in message to the out message diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentRequestTimeoutIT.java b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentRequestTimeoutIT.java new file mode 100644 index 000000000000..c714bbf56b37 --- /dev/null +++ b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentRequestTimeoutIT.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.cassandra.integration; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.cassandra.CassandraEndpoint; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration tests for the requestTimeout parameter in the Cassandra component. + */ +public class CassandraComponentRequestTimeoutIT extends BaseCassandra { + + static final String CQL_INSERT = "insert into camel_user(login, first_name, last_name) values (?, ?, ?)"; + static final String CQL_SELECT = "select login, first_name, last_name from camel_user"; + + @Produce("direct:inputWithTimeout") + ProducerTemplate producerWithTimeout; + + @Produce("direct:inputWithTimeoutUnprepared") + ProducerTemplate producerWithTimeoutUnprepared; + + @Produce("direct:inputWithoutTimeout") + ProducerTemplate producerWithoutTimeout; + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + // Producer routes with different timeout configurations + from("direct:inputWithTimeout") + .toF("cql://%s/%s?cql=%s&requestTimeout=30000", getUrl(), KEYSPACE_NAME, CQL_INSERT); + + from("direct:inputWithTimeoutUnprepared") + .toF("cql://%s/%s?cql=%s&requestTimeout=30000&prepareStatements=false", getUrl(), KEYSPACE_NAME, + CQL_INSERT); + + from("direct:inputWithoutTimeout") + .toF("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME, CQL_INSERT); + + // Consumer routes with different timeout configurations + fromF("cql://%s/%s?cql=%s&requestTimeout=30000", getUrl(), KEYSPACE_NAME, CQL_SELECT) + .to("mock:resultWithTimeout"); + + fromF("cql://%s/%s?cql=%s&requestTimeout=30000&prepareStatements=false", getUrl(), KEYSPACE_NAME, CQL_SELECT) + .to("mock:resultWithTimeoutUnprepared"); + } + }; + } + + /** + * Test that the requestTimeout parameter is correctly set on the endpoint. + */ + @Test + public void testEndpointRequestTimeoutConfiguration() { + CassandraEndpoint endpointWithTimeout = getMandatoryEndpoint( + String.format("cql://%s/%s?cql=%s&requestTimeout=30000", getUrl(), KEYSPACE_NAME, CQL_INSERT), + CassandraEndpoint.class); + assertEquals(Long.valueOf(30000), endpointWithTimeout.getRequestTimeout()); + + CassandraEndpoint endpointWithoutTimeout = getMandatoryEndpoint( + String.format("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME, CQL_INSERT), + CassandraEndpoint.class); + assertNull(endpointWithoutTimeout.getRequestTimeout()); + } + + /** + * Test producer with requestTimeout using prepared statements. + */ + @Test + public void testProducerWithTimeoutPreparedStatements() { + producerWithTimeout.requestBody(Arrays.asList("timeout_user1", "Timeout", "User1")); + + ResultSet resultSet = getSession() + .execute("select login, first_name, last_name from camel_user where login = ?", "timeout_user1"); + Row row = resultSet.one(); + assertNotNull(row); + assertEquals("Timeout", row.getString("first_name")); + assertEquals("User1", row.getString("last_name")); + } + + /** + * Test producer with requestTimeout using unprepared statements. + */ + @Test + public void testProducerWithTimeoutUnpreparedStatements() { + producerWithTimeoutUnprepared.requestBody(Arrays.asList("timeout_user2", "Timeout", "User2")); + + ResultSet resultSet = getSession() + .execute("select login, first_name, last_name from camel_user where login = ?", "timeout_user2"); + Row row = resultSet.one(); + assertNotNull(row); + assertEquals("Timeout", row.getString("first_name")); + assertEquals("User2", row.getString("last_name")); + } + + /** + * Test producer without requestTimeout (uses driver default). + */ + @Test + public void testProducerWithoutTimeout() { + producerWithoutTimeout.requestBody(Arrays.asList("no_timeout_user", "NoTimeout", "User")); + + ResultSet resultSet = getSession() + .execute("select login, first_name, last_name from camel_user where login = ?", "no_timeout_user"); + Row row = resultSet.one(); + assertNotNull(row); + assertEquals("NoTimeout", row.getString("first_name")); + assertEquals("User", row.getString("last_name")); + } + + /** + * Test consumer with requestTimeout using prepared statements. + */ + @Test + public void testConsumerWithTimeoutPreparedStatements() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:resultWithTimeout"); + mock.expectedMinimumMessageCount(1); + mock.whenAnyExchangeReceived(new Processor() { + @Override + public void process(Exchange exchange) { + Object body = exchange.getIn().getBody(); + assertTrue(body instanceof List); + } + }); + mock.await(1, TimeUnit.SECONDS); + MockEndpoint.assertIsSatisfied(context); + } + + /** + * Test consumer with requestTimeout using unprepared statements. + */ + @Test + public void testConsumerWithTimeoutUnpreparedStatements() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:resultWithTimeoutUnprepared"); + mock.expectedMinimumMessageCount(1); + mock.whenAnyExchangeReceived(new Processor() { + @Override + public void process(Exchange exchange) { + Object body = exchange.getIn().getBody(); + assertTrue(body instanceof List); + } + }); + mock.await(1, TimeUnit.SECONDS); + MockEndpoint.assertIsSatisfied(context); + } + + /** + * Test that different timeout values can be configured. + */ + @Test + public void testDifferentTimeoutValues() { + // Test with 1 second timeout + CassandraEndpoint endpoint1s = getMandatoryEndpoint( + String.format("cql://%s/%s?cql=%s&requestTimeout=1000", getUrl(), KEYSPACE_NAME, CQL_SELECT), + CassandraEndpoint.class); + assertEquals(Long.valueOf(1000), endpoint1s.getRequestTimeout()); + + // Test with 60 second timeout + CassandraEndpoint endpoint60s = getMandatoryEndpoint( + String.format("cql://%s/%s?cql=%s&requestTimeout=60000", getUrl(), KEYSPACE_NAME, CQL_SELECT), + CassandraEndpoint.class); + assertEquals(Long.valueOf(60000), endpoint60s.getRequestTimeout()); + + // Test with 5 minute timeout + CassandraEndpoint endpoint5m = getMandatoryEndpoint( + String.format("cql://%s/%s?cql=%s&requestTimeout=300000", getUrl(), KEYSPACE_NAME, CQL_SELECT), + CassandraEndpoint.class); + assertEquals(Long.valueOf(300000), endpoint5m.getRequestTimeout()); + } +} diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CassandraEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CassandraEndpointBuilderFactory.java index da4906164824..61db7b074530 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CassandraEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CassandraEndpointBuilderFactory.java @@ -834,6 +834,36 @@ public interface CassandraEndpointBuilderFactory { doSetProperty("loadBalancingPolicyClass", loadBalancingPolicyClass); return this; } + /** + * Request timeout in milliseconds. The timeout is applied to each + * individual query execution. + * + * The option is a: <code>java.lang.Long</code> type. + * + * Group: advanced + * + * @param requestTimeout the value to set + * @return the dsl builder + */ + default AdvancedCassandraEndpointConsumerBuilder requestTimeout(Long requestTimeout) { + doSetProperty("requestTimeout", requestTimeout); + return this; + } + /** + * Request timeout in milliseconds. The timeout is applied to each + * individual query execution. + * + * The option will be converted to a <code>java.lang.Long</code> type. + * + * Group: advanced + * + * @param requestTimeout the value to set + * @return the dsl builder + */ + default AdvancedCassandraEndpointConsumerBuilder requestTimeout(String requestTimeout) { + doSetProperty("requestTimeout", requestTimeout); + return this; + } /** * To use a custom class that implements logic for converting ResultSet * into message body ALL, ONE, LIMIT_10, LIMIT_100... @@ -1101,6 +1131,36 @@ public interface CassandraEndpointBuilderFactory { doSetProperty("loadBalancingPolicyClass", loadBalancingPolicyClass); return this; } + /** + * Request timeout in milliseconds. The timeout is applied to each + * individual query execution. + * + * The option is a: <code>java.lang.Long</code> type. + * + * Group: advanced + * + * @param requestTimeout the value to set + * @return the dsl builder + */ + default AdvancedCassandraEndpointProducerBuilder requestTimeout(Long requestTimeout) { + doSetProperty("requestTimeout", requestTimeout); + return this; + } + /** + * Request timeout in milliseconds. The timeout is applied to each + * individual query execution. + * + * The option will be converted to a <code>java.lang.Long</code> type. + * + * Group: advanced + * + * @param requestTimeout the value to set + * @return the dsl builder + */ + default AdvancedCassandraEndpointProducerBuilder requestTimeout(String requestTimeout) { + doSetProperty("requestTimeout", requestTimeout); + return this; + } /** * To use a custom class that implements logic for converting ResultSet * into message body ALL, ONE, LIMIT_10, LIMIT_100... @@ -1326,6 +1386,36 @@ public interface CassandraEndpointBuilderFactory { doSetProperty("loadBalancingPolicyClass", loadBalancingPolicyClass); return this; } + /** + * Request timeout in milliseconds. The timeout is applied to each + * individual query execution. + * + * The option is a: <code>java.lang.Long</code> type. + * + * Group: advanced + * + * @param requestTimeout the value to set + * @return the dsl builder + */ + default AdvancedCassandraEndpointBuilder requestTimeout(Long requestTimeout) { + doSetProperty("requestTimeout", requestTimeout); + return this; + } + /** + * Request timeout in milliseconds. The timeout is applied to each + * individual query execution. + * + * The option will be converted to a <code>java.lang.Long</code> type. + * + * Group: advanced + * + * @param requestTimeout the value to set + * @return the dsl builder + */ + default AdvancedCassandraEndpointBuilder requestTimeout(String requestTimeout) { + doSetProperty("requestTimeout", requestTimeout); + return this; + } /** * To use a custom class that implements logic for converting ResultSet * into message body ALL, ONE, LIMIT_10, LIMIT_100...
