This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch sf in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6052389c1a162ade71ad1a817d01830fb6303334 Author: Claus Ibsen <[email protected]> AuthorDate: Sat Oct 11 11:20:04 2025 +0200 CAMEL-22332: camel-salesforce - Add worker thread pool for consumers. --- .../camel/catalog/components/salesforce.json | 25 +++--- .../salesforce/SalesforceEndpointConfigurer.java | 18 +++++ .../salesforce/SalesforceEndpointUriFactory.java | 5 +- .../camel/component/salesforce/salesforce.json | 25 +++--- .../src/main/docs/salesforce-component.adoc | 9 +++ .../component/salesforce/PubSubApiConsumer.java | 28 ++++++- .../component/salesforce/SalesforceEndpoint.java | 49 +++++++++++- .../component/salesforce/StreamingApiConsumer.java | 25 +++++- .../dsl/SalesforceEndpointBuilderFactory.java | 92 ++++++++++++++++++++++ 9 files changed, 248 insertions(+), 28 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/salesforce.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/salesforce.json index 14e8b7454d6e..2641b5448c8c 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/salesforce.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/salesforce.json @@ -213,16 +213,19 @@ "bridgeErrorHandler": { "index": 55, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] "exceptionHandler": { "index": 56, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By de [...] "exchangePattern": { "index": 57, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, - "allOrNone": { "index": 58, "kind": "parameter", "displayName": "All Or None", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Composite API option to indicate to rollback all records if any are not successful." }, - "apexUrl": { "index": 59, "kind": "parameter", "displayName": "Apex Url", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "APEX method URL" }, - "compositeMethod": { "index": 60, "kind": "parameter", "displayName": "Composite Method", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Composite (raw) method." }, - "eventName": { "index": 61, "kind": "parameter", "displayName": "Event Name", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Name of Platform Event, Change Data Capture Event, custom event, etc." }, - "eventSchemaFormat": { "index": 62, "kind": "parameter", "displayName": "Event Schema Format", "group": "producer", "label": "producer", "required": false, "type": "enum", "javaType": "org.apache.camel.component.salesforce.internal.dto.EventSchemaFormatEnum", "enum": [ "EXPANDED", "COMPACT" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "descrip [...] - "eventSchemaId": { "index": 63, "kind": "parameter", "displayName": "Event Schema Id", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "The ID of the event schema." }, - "rawHttpHeaders": { "index": 64, "kind": "parameter", "displayName": "Raw Http Headers", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Comma separated list of message headers to include as HTTP parameters for Raw operation." }, - "rawMethod": { "index": 65, "kind": "parameter", "displayName": "Raw Method", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "HTTP method to use for the Raw operation" }, - "rawPath": { "index": 66, "kind": "parameter", "displayName": "Raw Path", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "The portion of the endpoint URL after the domain name. E.g., '\/services\/data\/v52.0\/sobjects\/Accou [...] - "rawQueryParameters": { "index": 67, "kind": "parameter", "displayName": "Raw Query Parameters", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Comma separated list of message headers to include as query parameters for Raw [...] - "lazyStartProducer": { "index": 68, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] + "workerPoolEnabled": { "index": 58, "kind": "parameter", "displayName": "Worker Pool Enabled", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Use thread-pool for processing received Salesforce events, for example to process events in parallel." }, + "workerPoolMaxSize": { "index": 59, "kind": "parameter", "displayName": "Worker Pool Max Size", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "description": "Maximum thread pool-size size for consumer worker pool." }, + "workerPoolSize": { "index": 60, "kind": "parameter", "displayName": "Worker Pool Size", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "description": "Minimum thread pool-size size for consumer worker pool." }, + "allOrNone": { "index": 61, "kind": "parameter", "displayName": "All Or None", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Composite API option to indicate to rollback all records if any are not successful." }, + "apexUrl": { "index": 62, "kind": "parameter", "displayName": "Apex Url", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "APEX method URL" }, + "compositeMethod": { "index": 63, "kind": "parameter", "displayName": "Composite Method", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Composite (raw) method." }, + "eventName": { "index": 64, "kind": "parameter", "displayName": "Event Name", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Name of Platform Event, Change Data Capture Event, custom event, etc." }, + "eventSchemaFormat": { "index": 65, "kind": "parameter", "displayName": "Event Schema Format", "group": "producer", "label": "producer", "required": false, "type": "enum", "javaType": "org.apache.camel.component.salesforce.internal.dto.EventSchemaFormatEnum", "enum": [ "EXPANDED", "COMPACT" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "descrip [...] + "eventSchemaId": { "index": 66, "kind": "parameter", "displayName": "Event Schema Id", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "The ID of the event schema." }, + "rawHttpHeaders": { "index": 67, "kind": "parameter", "displayName": "Raw Http Headers", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Comma separated list of message headers to include as HTTP parameters for Raw operation." }, + "rawMethod": { "index": 68, "kind": "parameter", "displayName": "Raw Method", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "HTTP method to use for the Raw operation" }, + "rawPath": { "index": 69, "kind": "parameter", "displayName": "Raw Path", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "The portion of the endpoint URL after the domain name. E.g., '\/services\/data\/v52.0\/sobjects\/Accou [...] + "rawQueryParameters": { "index": 70, "kind": "parameter", "displayName": "Raw Query Parameters", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Comma separated list of message headers to include as query parameters for Raw [...] + "lazyStartProducer": { "index": 71, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] } } diff --git a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java index 4edb088bfa6c..733626b35de7 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java +++ b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointConfigurer.java @@ -154,6 +154,12 @@ public class SalesforceEndpointConfigurer extends PropertyConfigurerSupport impl case "streamQueryResult": target.getConfiguration().setStreamQueryResult(property(camelContext, java.lang.Boolean.class, value)); return true; case "updatetopic": case "updateTopic": target.getConfiguration().setUpdateTopic(property(camelContext, boolean.class, value)); return true; + case "workerpoolenabled": + case "workerPoolEnabled": target.setWorkerPoolEnabled(property(camelContext, boolean.class, value)); return true; + case "workerpoolmaxsize": + case "workerPoolMaxSize": target.setWorkerPoolMaxSize(property(camelContext, int.class, value)); return true; + case "workerpoolsize": + case "workerPoolSize": target.setWorkerPoolSize(property(camelContext, int.class, value)); return true; default: return false; } } @@ -292,6 +298,12 @@ public class SalesforceEndpointConfigurer extends PropertyConfigurerSupport impl case "streamQueryResult": return java.lang.Boolean.class; case "updatetopic": case "updateTopic": return boolean.class; + case "workerpoolenabled": + case "workerPoolEnabled": return boolean.class; + case "workerpoolmaxsize": + case "workerPoolMaxSize": return int.class; + case "workerpoolsize": + case "workerPoolSize": return int.class; default: return null; } } @@ -431,6 +443,12 @@ public class SalesforceEndpointConfigurer extends PropertyConfigurerSupport impl case "streamQueryResult": return target.getConfiguration().getStreamQueryResult(); case "updatetopic": case "updateTopic": return target.getConfiguration().isUpdateTopic(); + case "workerpoolenabled": + case "workerPoolEnabled": return target.isWorkerPoolEnabled(); + case "workerpoolmaxsize": + case "workerPoolMaxSize": return target.getWorkerPoolMaxSize(); + case "workerpoolsize": + case "workerPoolSize": return target.getWorkerPoolSize(); default: return null; } } diff --git a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java index 217a69701821..ab7046da2b4f 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java +++ b/components/camel-salesforce/camel-salesforce-component/src/generated/java/org/apache/camel/component/salesforce/SalesforceEndpointUriFactory.java @@ -23,7 +23,7 @@ public class SalesforceEndpointUriFactory extends org.apache.camel.support.compo private static final Set<String> SECRET_PROPERTY_NAMES; private static final Map<String, String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(69); + Set<String> props = new HashSet<>(72); props.add("allOrNone"); props.add("apexMethod"); props.add("apexQueryParams"); @@ -93,6 +93,9 @@ public class SalesforceEndpointUriFactory extends org.apache.camel.support.compo props.add("streamQueryResult"); props.add("topicName"); props.add("updateTopic"); + props.add("workerPoolEnabled"); + props.add("workerPoolMaxSize"); + props.add("workerPoolSize"); PROPERTY_NAMES = Collections.unmodifiableSet(props); SECRET_PROPERTY_NAMES = Collections.emptySet(); MULTI_VALUE_PREFIXES = Collections.emptyMap(); diff --git a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json index 14e8b7454d6e..2641b5448c8c 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json +++ b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json @@ -213,16 +213,19 @@ "bridgeErrorHandler": { "index": 55, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] "exceptionHandler": { "index": 56, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By de [...] "exchangePattern": { "index": 57, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, - "allOrNone": { "index": 58, "kind": "parameter", "displayName": "All Or None", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Composite API option to indicate to rollback all records if any are not successful." }, - "apexUrl": { "index": 59, "kind": "parameter", "displayName": "Apex Url", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "APEX method URL" }, - "compositeMethod": { "index": 60, "kind": "parameter", "displayName": "Composite Method", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Composite (raw) method." }, - "eventName": { "index": 61, "kind": "parameter", "displayName": "Event Name", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Name of Platform Event, Change Data Capture Event, custom event, etc." }, - "eventSchemaFormat": { "index": 62, "kind": "parameter", "displayName": "Event Schema Format", "group": "producer", "label": "producer", "required": false, "type": "enum", "javaType": "org.apache.camel.component.salesforce.internal.dto.EventSchemaFormatEnum", "enum": [ "EXPANDED", "COMPACT" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "descrip [...] - "eventSchemaId": { "index": 63, "kind": "parameter", "displayName": "Event Schema Id", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "The ID of the event schema." }, - "rawHttpHeaders": { "index": 64, "kind": "parameter", "displayName": "Raw Http Headers", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Comma separated list of message headers to include as HTTP parameters for Raw operation." }, - "rawMethod": { "index": 65, "kind": "parameter", "displayName": "Raw Method", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "HTTP method to use for the Raw operation" }, - "rawPath": { "index": 66, "kind": "parameter", "displayName": "Raw Path", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "The portion of the endpoint URL after the domain name. E.g., '\/services\/data\/v52.0\/sobjects\/Accou [...] - "rawQueryParameters": { "index": 67, "kind": "parameter", "displayName": "Raw Query Parameters", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Comma separated list of message headers to include as query parameters for Raw [...] - "lazyStartProducer": { "index": 68, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] + "workerPoolEnabled": { "index": 58, "kind": "parameter", "displayName": "Worker Pool Enabled", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Use thread-pool for processing received Salesforce events, for example to process events in parallel." }, + "workerPoolMaxSize": { "index": 59, "kind": "parameter", "displayName": "Worker Pool Max Size", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "description": "Maximum thread pool-size size for consumer worker pool." }, + "workerPoolSize": { "index": 60, "kind": "parameter", "displayName": "Worker Pool Size", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "description": "Minimum thread pool-size size for consumer worker pool." }, + "allOrNone": { "index": 61, "kind": "parameter", "displayName": "All Or None", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Composite API option to indicate to rollback all records if any are not successful." }, + "apexUrl": { "index": 62, "kind": "parameter", "displayName": "Apex Url", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "APEX method URL" }, + "compositeMethod": { "index": 63, "kind": "parameter", "displayName": "Composite Method", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Composite (raw) method." }, + "eventName": { "index": 64, "kind": "parameter", "displayName": "Event Name", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Name of Platform Event, Change Data Capture Event, custom event, etc." }, + "eventSchemaFormat": { "index": 65, "kind": "parameter", "displayName": "Event Schema Format", "group": "producer", "label": "producer", "required": false, "type": "enum", "javaType": "org.apache.camel.component.salesforce.internal.dto.EventSchemaFormatEnum", "enum": [ "EXPANDED", "COMPACT" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "descrip [...] + "eventSchemaId": { "index": 66, "kind": "parameter", "displayName": "Event Schema Id", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "The ID of the event schema." }, + "rawHttpHeaders": { "index": 67, "kind": "parameter", "displayName": "Raw Http Headers", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Comma separated list of message headers to include as HTTP parameters for Raw operation." }, + "rawMethod": { "index": 68, "kind": "parameter", "displayName": "Raw Method", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "HTTP method to use for the Raw operation" }, + "rawPath": { "index": 69, "kind": "parameter", "displayName": "Raw Path", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "The portion of the endpoint URL after the domain name. E.g., '\/services\/data\/v52.0\/sobjects\/Accou [...] + "rawQueryParameters": { "index": 70, "kind": "parameter", "displayName": "Raw Query Parameters", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.salesforce.SalesforceEndpointConfig", "configurationField": "configuration", "description": "Comma separated list of message headers to include as query parameters for Raw [...] + "lazyStartProducer": { "index": 71, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] } } diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc index be5b55bd8088..8a160dbd1658 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc +++ b/components/camel-salesforce/camel-salesforce-component/src/main/docs/salesforce-component.adoc @@ -1884,6 +1884,15 @@ from("salesforce:subscribe:CamelTestTopic&sObjectName=Merchandise__c")... |=== +===== Parallel processing received events + +You can turn on `workerPoolEnabled=true` on the salesforce endpoint to let Camel use a thread-pool to process +the received events, which allows to process these events in parallel. + +NOTE: It has been reported that when receiving from PUSH_TOPIC events and then later sending a message +to salesforce (via producer) via the same thread could cause Salesforce to block. Enabling the worker pool +can help with this. See more in https://issues.apache.org/jira/browse/CAMEL-22332[CAMEL-22332]. + *Output* Type: Class passed via `sObjectName` parameter diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java index e1734bbee230..0dc99bf4f0d2 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java @@ -17,6 +17,7 @@ package org.apache.camel.component.salesforce; import java.util.Map; +import java.util.concurrent.ExecutorService; import com.salesforce.eventbus.protobuf.ReplayPreset; import org.apache.camel.AsyncCallback; @@ -39,13 +40,13 @@ public class PubSubApiConsumer extends DefaultConsumer { private String initialReplayId; private boolean fallbackToLatestReplayId; private final SalesforceEndpoint endpoint; + private ExecutorService executorService; private final int batchSize; private final PubSubDeserializeType deserializeType; private Class<?> pojoClass; private PubSubApiClient pubSubClient; private Map<String, Class<?>> eventClassMap; - private boolean usePlainTextConnection = false; public PubSubApiConsumer(SalesforceEndpoint endpoint, Processor processor) throws ClassNotFoundException { @@ -73,8 +74,22 @@ public class PubSubApiConsumer extends DefaultConsumer { in.setHeader(HEADER_SALESFORCE_PUBSUB_EVENT_ID, eventId); in.setHeader(HEADER_SALESFORCE_PUBSUB_REPLAY_ID, replayId); in.setHeader(HEADER_SALESFORCE_PUBSUB_RPC_ID, rpcId); + + // use default consumer callback AsyncCallback cb = defaultConsumerCallback(exchange, true); - getAsyncProcessor().process(exchange, cb); + if (executorService != null) { + executorService.submit(() -> getAsyncProcessor().process(exchange, cb)); + } else { + getAsyncProcessor().process(exchange, cb); + } + } + + public ExecutorService getExecutorService() { + return executorService; + } + + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; } @Override @@ -103,6 +118,15 @@ public class PubSubApiConsumer extends DefaultConsumer { super.doStop(); } + @Override + protected void doShutdown() throws Exception { + super.doShutdown(); + if (executorService != null) { + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService); + executorService = null; + } + } + public String getTopic() { return topic; } diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java index f7a47db573ab..fefb219c2fdd 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.salesforce; +import java.util.concurrent.ExecutorService; + import org.apache.camel.Category; import org.apache.camel.Consumer; import org.apache.camel.Processor; @@ -72,6 +74,16 @@ public class SalesforceEndpoint extends DefaultEndpoint { @UriParam(label = "consumer", description = "The replayId value to use when subscribing to the Pub/Sub API.") private String pubSubReplayId; + @UriParam(label = "consumer,advanced", + description = "Use thread-pool for processing received Salesforce events, for example to process events in parallel.") + private boolean workerPoolEnabled; + @UriParam(label = "consumer,advanced", description = "Minimum thread pool-size size for consumer worker pool.", + defaultValue = "1") + private int workerPoolSize = 1; + @UriParam(label = "consumer,advanced", description = "Maximum thread pool-size size for consumer worker pool.", + defaultValue = "10") + private int workerPoolMaxSize = 10; + public SalesforceEndpoint(String uri, SalesforceComponent salesforceComponent, SalesforceEndpointConfig configuration, OperationName operationName, String topicName) { super(uri, salesforceComponent); @@ -94,14 +106,23 @@ public class SalesforceEndpoint extends DefaultEndpoint { @Override public Consumer createConsumer(Processor processor) throws Exception { + ExecutorService executorService = null; + if (workerPoolEnabled) { + executorService = getCamelContext().getExecutorServiceManager().newThreadPool(this, "SalesforceWorkerPool", + workerPoolSize, workerPoolMaxSize); + } Consumer consumer = null; switch (operationName) { case SUBSCRIBE -> { final SubscriptionHelper subscriptionHelper = getComponent().getSubscriptionHelper(); - consumer = new StreamingApiConsumer(this, processor, subscriptionHelper); + StreamingApiConsumer answer = new StreamingApiConsumer(this, processor, subscriptionHelper); + answer.setExecutorService(executorService); + consumer = answer; } case PUBSUB_SUBSCRIBE -> { - consumer = new PubSubApiConsumer(this, processor); + PubSubApiConsumer answer = new PubSubApiConsumer(this, processor); + answer.setExecutorService(executorService); + consumer = answer; } default -> { // NO OP @@ -144,6 +165,30 @@ public class SalesforceEndpoint extends DefaultEndpoint { this.pubSubReplayId = pubSubReplayId; } + public boolean isWorkerPoolEnabled() { + return workerPoolEnabled; + } + + public void setWorkerPoolEnabled(boolean workerPoolEnabled) { + this.workerPoolEnabled = workerPoolEnabled; + } + + public int getWorkerPoolSize() { + return workerPoolSize; + } + + public void setWorkerPoolSize(int workerPoolSize) { + this.workerPoolSize = workerPoolSize; + } + + public int getWorkerPoolMaxSize() { + return workerPoolMaxSize; + } + + public void setWorkerPoolMaxSize(int workerPoolMaxSize) { + this.workerPoolMaxSize = workerPoolMaxSize; + } + @Override protected void doStart() throws Exception { try { diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/StreamingApiConsumer.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/StreamingApiConsumer.java index 4dcfd100a6e5..95a8a5e730dd 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/StreamingApiConsumer.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/StreamingApiConsumer.java @@ -19,6 +19,7 @@ package org.apache.camel.component.salesforce; import java.io.IOException; import java.io.StringReader; import java.util.Map; +import java.util.concurrent.ExecutorService; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.camel.AsyncCallback; @@ -74,6 +75,7 @@ public class StreamingApiConsumer extends DefaultConsumer { private final SalesforceEndpoint endpoint; private final MessageKind messageKind; private final ObjectMapper objectMapper; + private ExecutorService executorService; private final boolean rawPayload; private Class<?> sObjectClass; @@ -104,6 +106,14 @@ public class StreamingApiConsumer extends DefaultConsumer { rawPayload = endpoint.getConfiguration().isRawPayload(); } + public ExecutorService getExecutorService() { + return executorService; + } + + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + public String getTopicName() { return topicName; } @@ -141,7 +151,11 @@ public class StreamingApiConsumer extends DefaultConsumer { // use default consumer callback AsyncCallback cb = defaultConsumerCallback(exchange, true); - getAsyncProcessor().process(exchange, cb); + if (executorService != null) { + executorService.submit(() -> getAsyncProcessor().process(exchange, cb)); + } else { + getAsyncProcessor().process(exchange, cb); + } } @SuppressWarnings("unchecked") @@ -333,6 +347,15 @@ public class StreamingApiConsumer extends DefaultConsumer { } } + @Override + protected void doShutdown() throws Exception { + super.doShutdown(); + if (executorService != null) { + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService); + executorService = null; + } + } + // May be necessary to call from some unit tests. void determineSObjectClass() { // get sObjectClass to convert to diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SalesforceEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SalesforceEndpointBuilderFactory.java index 4971cfc96e79..d8b7b02f923b 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SalesforceEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/SalesforceEndpointBuilderFactory.java @@ -1497,6 +1497,98 @@ public interface SalesforceEndpointBuilderFactory { doSetProperty("exchangePattern", exchangePattern); return this; } + /** + * Use thread-pool for processing received Salesforce events, for + * example to process events in parallel. + * + * The option is a: <code>boolean</code> type. + * + * Default: false + * Group: consumer (advanced) + * + * @param workerPoolEnabled the value to set + * @return the dsl builder + */ + default AdvancedSalesforceEndpointConsumerBuilder workerPoolEnabled(boolean workerPoolEnabled) { + doSetProperty("workerPoolEnabled", workerPoolEnabled); + return this; + } + /** + * Use thread-pool for processing received Salesforce events, for + * example to process events in parallel. + * + * The option will be converted to a <code>boolean</code> type. + * + * Default: false + * Group: consumer (advanced) + * + * @param workerPoolEnabled the value to set + * @return the dsl builder + */ + default AdvancedSalesforceEndpointConsumerBuilder workerPoolEnabled(String workerPoolEnabled) { + doSetProperty("workerPoolEnabled", workerPoolEnabled); + return this; + } + /** + * Maximum thread pool-size size for consumer worker pool. + * + * The option is a: <code>int</code> type. + * + * Default: 10 + * Group: consumer (advanced) + * + * @param workerPoolMaxSize the value to set + * @return the dsl builder + */ + default AdvancedSalesforceEndpointConsumerBuilder workerPoolMaxSize(int workerPoolMaxSize) { + doSetProperty("workerPoolMaxSize", workerPoolMaxSize); + return this; + } + /** + * Maximum thread pool-size size for consumer worker pool. + * + * The option will be converted to a <code>int</code> type. + * + * Default: 10 + * Group: consumer (advanced) + * + * @param workerPoolMaxSize the value to set + * @return the dsl builder + */ + default AdvancedSalesforceEndpointConsumerBuilder workerPoolMaxSize(String workerPoolMaxSize) { + doSetProperty("workerPoolMaxSize", workerPoolMaxSize); + return this; + } + /** + * Minimum thread pool-size size for consumer worker pool. + * + * The option is a: <code>int</code> type. + * + * Default: 1 + * Group: consumer (advanced) + * + * @param workerPoolSize the value to set + * @return the dsl builder + */ + default AdvancedSalesforceEndpointConsumerBuilder workerPoolSize(int workerPoolSize) { + doSetProperty("workerPoolSize", workerPoolSize); + return this; + } + /** + * Minimum thread pool-size size for consumer worker pool. + * + * The option will be converted to a <code>int</code> type. + * + * Default: 1 + * Group: consumer (advanced) + * + * @param workerPoolSize the value to set + * @return the dsl builder + */ + default AdvancedSalesforceEndpointConsumerBuilder workerPoolSize(String workerPoolSize) { + doSetProperty("workerPoolSize", workerPoolSize); + return this; + } } /**
