This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 6d9dcca28a1 feat(camel-salesforce): Add Event ID as header for Pub/Sub
API event consumption (#17298)
6d9dcca28a1 is described below
commit 6d9dcca28a1b2b4ced2ea02b7c38f840e64f2fed
Author: Nathan Allen <[email protected]>
AuthorDate: Fri Feb 28 03:03:20 2025 -0500
feat(camel-salesforce): Add Event ID as header for Pub/Sub API event
consumption (#17298)
* add event id as header for pub/sub consumption
* change header javaTypes
* generate json
---
.../camel/component/salesforce/salesforce.json | 39 +++++++++++-----------
.../component/salesforce/PubSubApiConsumer.java | 4 ++-
.../component/salesforce/SalesforceConstants.java | 9 +++--
.../internal/client/PubSubApiClient.java | 3 +-
4 files changed, 32 insertions(+), 23 deletions(-)
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json
b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json
index 0501bd30708..8b4597286f3 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json
+++
b/components/camel-salesforce/camel-salesforce-component/src/generated/resources/META-INF/org/apache/camel/component/salesforce/salesforce.json
@@ -130,25 +130,26 @@
},
"headers": {
"CamelSalesforceReplayId": { "index": 0, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"Object", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The Streaming API replayId.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_REPLAY_ID"
},
- "CamelSalesforcePubSubReplayId": { "index": 1, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The Pub\/Sub API replayId.",
"constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_PUBSUB_REPLAY_ID"
},
- "CamelSalesforceChangeEventSchema": { "index": 2, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The change event schema.",
"constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CHANGE_EVENT_SCHEMA"
},
- "CamelSalesforceEventType": { "index": 3, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"String", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The event type.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_EVENT_TYPE"
},
- "CamelSalesforceCommitTimestamp": { "index": 4, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The commit timestamp.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_COMMIT_TIMESTAMP"
},
- "CamelSalesforceCommitUser": { "index": 5, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The commit user.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_COMMIT_USER"
},
- "CamelSalesforceCommitNumber": { "index": 6, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The commit number.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_COMMIT_NUMBER"
},
- "CamelSalesforceRecordIds": { "index": 7, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"Object", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The record ids.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_RECORD_IDS"
},
- "CamelSalesforceChangeType": { "index": 8, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The change type.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CHANGE_TYPE"
},
- "CamelSalesforceChangeOrigin": { "index": 9, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The change origin.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CHANGE_ORIGIN"
},
- "CamelSalesforceTransactionKey": { "index": 10, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The transaction key.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_TRANSACTION_KEY"
},
- "CamelSalesforceSequenceNumber": { "index": 11, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The sequence number.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_SEQUENCE_NUMBER"
},
- "CamelSalesforceIsTransactionEnd": { "index": 12, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Is transaction end.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_IS_TRANSACTION_END"
},
- "CamelSalesforceEntityName": { "index": 13, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The entity name.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_ENTITY_NAME"
},
- "CamelSalesforcePlatformEventSchema": { "index": 14, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The platform event schema.",
"constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_PLATFORM_EVENT_SCHEMA"
},
- "CamelSalesforceCreatedDate": { "index": 15, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "java.time.ZonedDateTime", "deprecated": false, "deprecationNote":
"", "autowired": false, "secret": false, "description": "The created date.",
"constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CREATED_DATE"
},
- "CamelSalesforceTopicName": { "index": 16, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The topic name.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_TOPIC_NAME"
},
- "CamelSalesforceChannel": { "index": 17, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"String", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The channel.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CHANNEL"
},
- "CamelSalesforceClientId": { "index": 18, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"String", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The client id.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CLIENT_ID"
},
- "CamelSalesforceQueryResultTotalSize": { "index": 19, "kind": "header",
"displayName": "", "group": "producer", "label": "producer", "required": false,
"javaType": "int", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Total number of records matching a
query.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_QUERY_RESULT_TOTAL_SIZE"
}
+ "CamelSalesforceChangeEventSchema": { "index": 1, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The change event schema.",
"constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CHANGE_EVENT_SCHEMA"
},
+ "CamelSalesforceEventType": { "index": 2, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"String", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The event type.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_EVENT_TYPE"
},
+ "CamelSalesforceCommitTimestamp": { "index": 3, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The commit timestamp.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_COMMIT_TIMESTAMP"
},
+ "CamelSalesforceCommitUser": { "index": 4, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The commit user.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_COMMIT_USER"
},
+ "CamelSalesforceCommitNumber": { "index": 5, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The commit number.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_COMMIT_NUMBER"
},
+ "CamelSalesforceRecordIds": { "index": 6, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"Object", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The record ids.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_RECORD_IDS"
},
+ "CamelSalesforceChangeType": { "index": 7, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The change type.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CHANGE_TYPE"
},
+ "CamelSalesforceChangeOrigin": { "index": 8, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The change origin.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CHANGE_ORIGIN"
},
+ "CamelSalesforceTransactionKey": { "index": 9, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The transaction key.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_TRANSACTION_KEY"
},
+ "CamelSalesforceSequenceNumber": { "index": 10, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The sequence number.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_SEQUENCE_NUMBER"
},
+ "CamelSalesforceIsTransactionEnd": { "index": 11, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Is transaction end.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_IS_TRANSACTION_END"
},
+ "CamelSalesforceEntityName": { "index": 12, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The entity name.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_ENTITY_NAME"
},
+ "CamelSalesforcePlatformEventSchema": { "index": 13, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "Object", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The platform event schema.",
"constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_PLATFORM_EVENT_SCHEMA"
},
+ "CamelSalesforceCreatedDate": { "index": 14, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "java.time.ZonedDateTime", "deprecated": false, "deprecationNote":
"", "autowired": false, "secret": false, "description": "The created date.",
"constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CREATED_DATE"
},
+ "CamelSalesforceTopicName": { "index": 15, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The topic name.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_TOPIC_NAME"
},
+ "CamelSalesforceChannel": { "index": 16, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"String", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The channel.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CHANNEL"
},
+ "CamelSalesforceClientId": { "index": 17, "kind": "header", "displayName":
"", "group": "consumer", "label": "consumer", "required": false, "javaType":
"String", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The client id.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_CLIENT_ID"
},
+ "CamelSalesforcePubSubReplayId": { "index": 18, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The Pub\/Sub API replayId.",
"constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_PUBSUB_REPLAY_ID"
},
+ "CamelSalesforcePubSubEventId": { "index": 19, "kind": "header",
"displayName": "", "group": "consumer", "label": "consumer", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The Pub\/Sub API event id.",
"constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_PUBSUB_EVENT_ID"
},
+ "CamelSalesforceQueryResultTotalSize": { "index": 20, "kind": "header",
"displayName": "", "group": "producer", "label": "producer", "required": false,
"javaType": "int", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Total number of records matching a
query.", "constantName":
"org.apache.camel.component.salesforce.SalesforceConstants#HEADER_SALESFORCE_QUERY_RESULT_TOTAL_SIZE"
}
},
"properties": {
"operationName": { "index": 0, "kind": "path", "displayName": "Operation
Name", "group": "common", "label": "common", "required": true, "type":
"object", "javaType":
"org.apache.camel.component.salesforce.internal.OperationName", "enum": [
"getVersions", "getResources", "getGlobalObjects", "getBasicInfo",
"getDescription", "getSObject", "createSObject", "updateSObject",
"deleteSObject", "getSObjectWithId", "upsertSObject", "deleteSObjectWithId",
"getBlobField", "query", "queryMore", [...]
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
index ab7310e3eec..2f86af72c96 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/PubSubApiConsumer.java
@@ -28,6 +28,7 @@ import
org.apache.camel.component.salesforce.internal.client.PubSubApiClient;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.service.ServiceHelper;
+import static
org.apache.camel.component.salesforce.SalesforceConstants.HEADER_SALESFORCE_PUBSUB_EVENT_ID;
import static
org.apache.camel.component.salesforce.SalesforceConstants.HEADER_SALESFORCE_PUBSUB_REPLAY_ID;
public class PubSubApiConsumer extends DefaultConsumer {
@@ -62,10 +63,11 @@ public class PubSubApiConsumer extends DefaultConsumer {
}
}
- public void processEvent(Object recordObj, String replayId) {
+ public void processEvent(Object recordObj, String eventId, String
replayId) {
final Exchange exchange = createExchange(true);
final Message in = exchange.getIn();
in.setBody(recordObj);
+ in.setHeader(HEADER_SALESFORCE_PUBSUB_EVENT_ID, eventId);
in.setHeader(HEADER_SALESFORCE_PUBSUB_REPLAY_ID, replayId);
AsyncCallback cb = defaultConsumerCallback(exchange, true);
getAsyncProcessor().process(exchange, cb);
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConstants.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConstants.java
index 61b6d78316a..b1c6e2703e1 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConstants.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceConstants.java
@@ -20,10 +20,9 @@ import org.apache.camel.spi.Metadata;
public final class SalesforceConstants {
+ // Streaming headers
@Metadata(label = "consumer", description = "The Streaming API replayId.",
javaType = "Object")
public static final String HEADER_SALESFORCE_REPLAY_ID =
"CamelSalesforceReplayId";
- @Metadata(label = "consumer", description = "The Pub/Sub API replayId.",
javaType = "Object")
- public static final String HEADER_SALESFORCE_PUBSUB_REPLAY_ID =
"CamelSalesforcePubSubReplayId";
@Metadata(label = "consumer", description = "The change event schema.",
javaType = "Object")
public static final String HEADER_SALESFORCE_CHANGE_EVENT_SCHEMA =
"CamelSalesforceChangeEventSchema";
@Metadata(label = "consumer", description = "The event type.", javaType =
"String")
@@ -59,6 +58,12 @@ public final class SalesforceConstants {
@Metadata(label = "consumer", description = "The client id.", javaType =
"String")
public static final String HEADER_SALESFORCE_CLIENT_ID =
"CamelSalesforceClientId";
+ // Pub/Sub API headers
+ @Metadata(label = "consumer", description = "The Pub/Sub API replayId.",
javaType = "String")
+ public static final String HEADER_SALESFORCE_PUBSUB_REPLAY_ID =
"CamelSalesforcePubSubReplayId";
+ @Metadata(label = "consumer", description = "The Pub/Sub API event id.",
javaType = "String")
+ public static final String HEADER_SALESFORCE_PUBSUB_EVENT_ID =
"CamelSalesforcePubSubEventId";
+
@Metadata(label = "producer", description = "Total number of records
matching a query.", javaType = "int")
public static final String HEADER_SALESFORCE_QUERY_RESULT_TOTAL_SIZE =
"CamelSalesforceQueryResultTotalSize";
diff --git
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
index ef16d7ecf4f..375b08b0ee3 100644
---
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
+++
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/PubSubApiClient.java
@@ -457,8 +457,9 @@ public class PubSubApiClient extends ServiceSupport {
case POJO -> deserializePojo(ce, schema);
case JSON -> deserializeJson(ce, schema);
};
+ String eventId = ce.getEvent().getId();
String replayId =
PubSubApiClient.base64EncodeByteString(ce.getReplayId());
- consumer.processEvent(recordObj, replayId);
+ consumer.processEvent(recordObj, eventId, replayId);
}
private Object deserializeAvro(ConsumerEvent ce, Schema schema) throws
IOException {