This is an automated email from the ASF dual-hosted git repository.
valdar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git
The following commit(s) were added to refs/heads/main by this push:
new ea3e5ce7 fixed Azure Storage Blob changefeed source connector the
connector produce one record per event the connector produced data are in json
format the connector add additional info as record headers
ea3e5ce7 is described below
commit ea3e5ce79b1825c26bc48c84a8fd703ee4aed9d2
Author: Andrea Tarocchi <[email protected]>
AuthorDate: Fri Dec 16 10:00:23 2022 +0100
fixed Azure Storage Blob changefeed source connector
the connector produce one record per event
the connector produced data are in json format
the connector add additional info as record headers
---
...ure-storage-blob-changefeed-source.kamelet.yaml | 68 +++++++++++++++++++---
...ure-storage-blob-changefeed-source.kamelet.yaml | 68 +++++++++++++++++++---
2 files changed, 122 insertions(+), 14 deletions(-)
diff --git a/kamelets/azure-storage-blob-changefeed-source.kamelet.yaml
b/kamelets/azure-storage-blob-changefeed-source.kamelet.yaml
index a4afb7c4..aa511d84 100644
--- a/kamelets/azure-storage-blob-changefeed-source.kamelet.yaml
+++ b/kamelets/azure-storage-blob-changefeed-source.kamelet.yaml
@@ -66,18 +66,72 @@ spec:
- "camel:azure-storage-blob"
- "camel:kamelet"
- "camel:core"
+ - 'camel:jackson'
- "camel:jsonpath"
- "camel:timer"
+ - "mvn:com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.4"
+ - "mvn:com.azure:azure-storage-blob-changefeed:12.0.0-beta.17"
template:
from:
uri: "timer:azure-storage-blob-stream"
parameters:
period: "{{period}}"
steps:
- - to:
- uri: "azure-storage-blob:{{accountName}}"
- parameters:
- operation: "getChangeFeed"
- accessKey: "{{accessKey}}"
- credentialType: "{{credentialType}}"
- - to: "kamelet:sink"
+ - to:
+ uri: "azure-storage-blob:{{accountName}}"
+ parameters:
+ operation: "getChangeFeed"
+ accessKey: "{{accessKey}}"
+ credentialType: "{{credentialType}}"
+ - split:
+ expression:
+ simple: "${body}"
+ - marshal:
+ json:
+ library: Jackson
+ module-class-names:
com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
+ unmarshalType: com.fasterxml.jackson.databind.JsonNode
+ - set-header:
+ name: "azure-storage-blob-changefeed-topic"
+ jsonpath:
+ suppress-exceptions: true
+ expression: $.topic
+ - set-header:
+ name: "azure-storage-blob-changefeed-subject"
+ jsonpath:
+ suppress-exceptions: true
+ expression: $.subject
+ - set-header:
+ name: "azure-storage-blob-changefeed-eventType"
+ jsonpath:
+ suppress-exceptions: true
+ expression: $.eventType
+ - set-header:
+ name: "azure-storage-blob-changefeed-eventTime"
+ jsonpath:
+ suppress-exceptions: true
+ expression: $.eventTime
+ - set-header:
+ name: "azure-storage-blob-changefeed-id"
+ jsonpath:
+ suppress-exceptions: true
+ expression: $.id
+ - set-header:
+ name: "azure-storage-blob-changefeed-dataVersion"
+ jsonpath:
+ suppress-exceptions: true
+ expression: $.dataVersion
+ - set-header:
+ name: "azure-storage-blob-changefeed-metadataVersion"
+ jsonpath:
+ suppress-exceptions: true
+ expression: $.metadataVersion
+ - set-body:
+ jsonpath:
+ expression: $.data
+ - marshal:
+ json:
+ library: Jackson
+ module-class-names:
com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
+ - to: "kamelet:sink"
+
diff --git
a/library/camel-kamelets/src/main/resources/kamelets/azure-storage-blob-changefeed-source.kamelet.yaml
b/library/camel-kamelets/src/main/resources/kamelets/azure-storage-blob-changefeed-source.kamelet.yaml
index a4afb7c4..aa511d84 100644
---
a/library/camel-kamelets/src/main/resources/kamelets/azure-storage-blob-changefeed-source.kamelet.yaml
+++
b/library/camel-kamelets/src/main/resources/kamelets/azure-storage-blob-changefeed-source.kamelet.yaml
@@ -66,18 +66,72 @@ spec:
- "camel:azure-storage-blob"
- "camel:kamelet"
- "camel:core"
+ - 'camel:jackson'
- "camel:jsonpath"
- "camel:timer"
+ - "mvn:com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.4"
+ - "mvn:com.azure:azure-storage-blob-changefeed:12.0.0-beta.17"
template:
from:
uri: "timer:azure-storage-blob-stream"
parameters:
period: "{{period}}"
steps:
- - to:
- uri: "azure-storage-blob:{{accountName}}"
- parameters:
- operation: "getChangeFeed"
- accessKey: "{{accessKey}}"
- credentialType: "{{credentialType}}"
- - to: "kamelet:sink"
+ - to:
+ uri: "azure-storage-blob:{{accountName}}"
+ parameters:
+ operation: "getChangeFeed"
+ accessKey: "{{accessKey}}"
+ credentialType: "{{credentialType}}"
+ - split:
+ expression:
+ simple: "${body}"
+ - marshal:
+ json:
+ library: Jackson
+ module-class-names:
com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
+ unmarshalType: com.fasterxml.jackson.databind.JsonNode
+ - set-header:
+ name: "azure-storage-blob-changefeed-topic"
+ jsonpath:
+ suppress-exceptions: true
+ expression: $.topic
+ - set-header:
+ name: "azure-storage-blob-changefeed-subject"
+ jsonpath:
+ suppress-exceptions: true
+ expression: $.subject
+ - set-header:
+ name: "azure-storage-blob-changefeed-eventType"
+ jsonpath:
+ suppress-exceptions: true
+ expression: $.eventType
+ - set-header:
+ name: "azure-storage-blob-changefeed-eventTime"
+ jsonpath:
+ suppress-exceptions: true
+ expression: $.eventTime
+ - set-header:
+ name: "azure-storage-blob-changefeed-id"
+ jsonpath:
+ suppress-exceptions: true
+ expression: $.id
+ - set-header:
+ name: "azure-storage-blob-changefeed-dataVersion"
+ jsonpath:
+ suppress-exceptions: true
+ expression: $.dataVersion
+ - set-header:
+ name: "azure-storage-blob-changefeed-metadataVersion"
+ jsonpath:
+ suppress-exceptions: true
+ expression: $.metadataVersion
+ - set-body:
+ jsonpath:
+ expression: $.data
+ - marshal:
+ json:
+ library: Jackson
+ module-class-names:
com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
+ - to: "kamelet:sink"
+