This is an automated email from the ASF dual-hosted git repository.
cdeppisch pushed a commit to branch 4.4.x
in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git
The following commit(s) were added to refs/heads/4.4.x by this push:
new 2cf846f9 Fix AWS DDB Streams Source Kamelet
2cf846f9 is described below
commit 2cf846f91f5b43def96a02cac3e25a283ee3988c
Author: Christoph Deppisch <[email protected]>
AuthorDate: Mon Apr 29 11:06:45 2024 +0200
Fix AWS DDB Streams Source Kamelet
- Use specific data type transformation to marshal the aws2-ddb record
domain model to Json
- Arbitrary Gson marshalling is not able to handle Java time instant type
that is being used in aws2-ddb domain model
- Add Java time instant Gson type adapter
- Use Gson with timer instant type adapter in aws2-ddb Json Struct data
type transformer
- Add YAKS test to verify aws2-ddb-streams-source Kamelet
---
kamelets/aws-ddb-streams-source.kamelet.yaml | 53 +++++++++++++++++++---
library/camel-kamelets-utils/pom.xml | 7 +++
.../gson/JavaTimeInstantTypeAdapter.java | 44 ++++++++++++++++++
.../ddb/Ddb2JsonStructDataTypeTransformer.java | 45 ++++++++++++++++++
.../transformer/aws2-ddb-application-x-struct | 1 +
.../transformer/aws2-ddb-application-x-struct.json | 13 ++++++
.../kamelets/aws-ddb-streams-source.kamelet.yaml | 53 +++++++++++++++++++---
.../test/resources/aws/ddb/amazonDDBClient.groovy | 9 ++++
.../aws/ddb/aws-ddb-sink-deleteItem.feature | 1 +
.../resources/aws/ddb/aws-ddb-sink-putItem.feature | 1 +
.../aws/ddb/aws-ddb-sink-updateItem.feature | 1 +
...Item.feature => aws-ddb-source-getItem.feature} | 32 +++++--------
.../resources/aws/ddb/aws-ddb-source-pipe.yaml | 42 +++++++++++++++++
.../src/test/resources/aws/ddb/yaks-config.yaml | 1 +
.../test/resources/citrus-application.properties | 4 +-
15 files changed, 273 insertions(+), 34 deletions(-)
diff --git a/kamelets/aws-ddb-streams-source.kamelet.yaml
b/kamelets/aws-ddb-streams-source.kamelet.yaml
index 5c7bb8a2..8e024bd2 100644
--- a/kamelets/aws-ddb-streams-source.kamelet.yaml
+++ b/kamelets/aws-ddb-streams-source.kamelet.yaml
@@ -67,7 +67,7 @@ spec:
enum: ["ap-south-1", "eu-south-1", "us-gov-east-1", "me-central-1",
"ca-central-1", "eu-central-1", "us-iso-west-1", "us-west-1", "us-west-2",
"af-south-1", "eu-north-1", "eu-west-3", "eu-west-2", "eu-west-1",
"ap-northeast-3", "ap-northeast-2", "ap-northeast-1", "me-south-1",
"sa-east-1", "ap-east-1", "cn-north-1", "us-gov-west-1", "ap-southeast-1",
"ap-southeast-2", "us-iso-east-1", "ap-southeast-3", "us-east-1", "us-east-2",
"cn-northwest-1", "us-isob-east-1", "aws-global", "a [...]
streamIteratorType:
title: Stream Iterator Type
- description: Defines where in the DynamoDB stream to start getting
records. There are two enums and the value can be one of FROM_LATEST and
FROM_START. Note that using FROM_START can cause a significant delay before the
stream has caught up to real-time.
+ description: Defines where in the DynamoDB stream to start getting
records. There are two enums and the value can be one of FROM_LATEST and
FROM_START. Note that using FROM_START can cause a significant delay before the
stream has caught up to real-time.
type: string
default: FROM_LATEST
useDefaultCredentialsProvider:
@@ -89,10 +89,52 @@ spec:
description: The number of milliseconds before the next poll from the
database.
type: integer
default: 500
- types:
+ dataTypes:
out:
- mediaType: application/json
+ default: json
+ headers:
+ CamelAwsDdbStreamEventSource:
+ title: The DDB Stream Event Source
+ description: The Amazon Web Services service from which the stream
record originated. For DynamoDB Streams, this is aws:dynamodb.
+ type: string
+ CamelAwsDdbStreamEventId:
+ title: The DDB Stream Event Id
+ description: A globally unique identifier for the event that was
recorded in this stream record.
+ type: string
+ types:
+ json:
+ format: "application-json"
+ description: Default Json representation of a DDB Stream Event.
+ mediaType: application/json
+ cloudevents:
+ format: "aws2-ddbstream:application-cloudevents"
+ description: |-
+ Data type transformer converts AWS Dynamo DB Streams get records
response to CloudEvent v1_0 data format. The data
+ type sets Camel specific CloudEvent headers with values extracted
from AWS Dynamo DB Streams get records.
+ headers:
+ CamelCloudEventID:
+ title: CloudEvent ID
+ description: The Camel exchange id set as event id
+ type: string
+ CamelCloudEventType:
+ title: CloudEvent Type
+ description: The event type
+ default: "org.apache.camel.event.aws.ddbstream.getRecords"
+ type: string
+ CamelCloudEventSource:
+ title: CloudEvent Source
+ description: The event source. By default, the DDB Stream Event
source receipt handle with prefix "aws.ddbstream.".
+ type: string
+ CamelCloudEventSubject:
+ title: CloudEvent Subject
+ description: The event subject. The DDB Stream Event Id.
+ type: string
+ CamelCloudEventTime:
+ title: CloudEvent Time
+ description: The exchange creation timestamp as event time.
+ type: string
dependencies:
+ - mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.4.2-SNAPSHOT
- "camel:gson"
- "camel:aws2-ddb"
- "camel:kamelet"
@@ -109,7 +151,6 @@ spec:
overrideEndpoint: "{{overrideEndpoint}}"
delay: "{{delay}}"
steps:
- - marshal:
- json:
- library: Gson
+ - transform:
+ toType: "aws2-ddb:application-x-struct"
- to: "kamelet:sink"
diff --git a/library/camel-kamelets-utils/pom.xml
b/library/camel-kamelets-utils/pom.xml
index dde20c47..95196f25 100644
--- a/library/camel-kamelets-utils/pom.xml
+++ b/library/camel-kamelets-utils/pom.xml
@@ -71,6 +71,13 @@
<artifactId>camel-kafka</artifactId>
</dependency>
+ <!-- Dependencies for Gson serialization type adapter -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-gson</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
<!-- Dependencies for azure credential configuration -->
<dependency>
<groupId>org.apache.camel</groupId>
diff --git
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/gson/JavaTimeInstantTypeAdapter.java
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/gson/JavaTimeInstantTypeAdapter.java
new file mode 100644
index 00000000..eaeb5e7a
--- /dev/null
+++
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/gson/JavaTimeInstantTypeAdapter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.serialization.gson;
+
+import java.lang.reflect.Type;
+import java.time.Instant;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+public class JavaTimeInstantTypeAdapter implements JsonSerializer<Instant>,
JsonDeserializer<Instant> {
+
+ @Override
+ public JsonElement serialize(final Instant time, final Type typeOfSrc,
+ final JsonSerializationContext context) {
+ return new JsonPrimitive(time.getEpochSecond() * 1000);
+ }
+
+ @Override
+ public Instant deserialize(final JsonElement json, final Type typeOfT,
+ final JsonDeserializationContext context)
throws JsonParseException {
+ return Instant.ofEpochMilli(json.getAsLong());
+ }
+}
diff --git
a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws2/ddb/Ddb2JsonStructDataTypeTransformer.java
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws2/ddb/Ddb2JsonStructDataTypeTransformer.java
new file mode 100644
index 00000000..e2dbb028
--- /dev/null
+++
b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws2/ddb/Ddb2JsonStructDataTypeTransformer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.transform.aws2.ddb;
+
+import java.time.Instant;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.camel.Message;
+import
org.apache.camel.kamelets.utils.serialization.gson.JavaTimeInstantTypeAdapter;
+import org.apache.camel.spi.DataType;
+import org.apache.camel.spi.DataTypeTransformer;
+import org.apache.camel.spi.Transformer;
+
+@DataTypeTransformer(name = "aws2-ddb:application-x-struct")
+public class Ddb2JsonStructDataTypeTransformer extends Transformer {
+
+ private final Gson gson = new GsonBuilder()
+ .registerTypeAdapter(Instant.class, new
JavaTimeInstantTypeAdapter())
+ .create();
+
+ @Override
+ public void transform(Message message, DataType fromType, DataType toType)
{
+ if (message.getBody() instanceof String) {
+ return;
+ }
+
+ message.setBody(gson.toJson(message.getBody()));
+ }
+}
diff --git
a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct
b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct
new file mode 100644
index 00000000..32bb47c1
--- /dev/null
+++
b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct
@@ -0,0 +1 @@
+class=org.apache.camel.kamelets.utils.transform.aws2.ddb.Ddb2JsonStructDataTypeTransformer
diff --git
a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct.json
b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct.json
new file mode 100644
index 00000000..49f0f390
--- /dev/null
+++
b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct.json
@@ -0,0 +1,13 @@
+{
+ "transformer": {
+ "kind": "transformer",
+ "name": "aws2-ddb:application-x-struct",
+ "title": "Aws2 Ddb (Application Json Struct)",
+ "description": "Transforms DynamoDB record into a Json node",
+ "deprecated": false,
+ "javaType":
"org.apache.camel.kamelets.utils.transform.aws2.ddb.Ddb2JsonStructDataTypeTransformer",
+ "groupId": "org.apache.camel",
+ "artifactId": "camel-aws2-ddb",
+ "version": "4.4.2-SNAPSHOT"
+ }
+}
diff --git
a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml
b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml
index 5c7bb8a2..8e024bd2 100644
---
a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml
+++
b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml
@@ -67,7 +67,7 @@ spec:
enum: ["ap-south-1", "eu-south-1", "us-gov-east-1", "me-central-1",
"ca-central-1", "eu-central-1", "us-iso-west-1", "us-west-1", "us-west-2",
"af-south-1", "eu-north-1", "eu-west-3", "eu-west-2", "eu-west-1",
"ap-northeast-3", "ap-northeast-2", "ap-northeast-1", "me-south-1",
"sa-east-1", "ap-east-1", "cn-north-1", "us-gov-west-1", "ap-southeast-1",
"ap-southeast-2", "us-iso-east-1", "ap-southeast-3", "us-east-1", "us-east-2",
"cn-northwest-1", "us-isob-east-1", "aws-global", "a [...]
streamIteratorType:
title: Stream Iterator Type
- description: Defines where in the DynamoDB stream to start getting
records. There are two enums and the value can be one of FROM_LATEST and
FROM_START. Note that using FROM_START can cause a significant delay before the
stream has caught up to real-time.
+ description: Defines where in the DynamoDB stream to start getting
records. There are two enums and the value can be one of FROM_LATEST and
FROM_START. Note that using FROM_START can cause a significant delay before the
stream has caught up to real-time.
type: string
default: FROM_LATEST
useDefaultCredentialsProvider:
@@ -89,10 +89,52 @@ spec:
description: The number of milliseconds before the next poll from the
database.
type: integer
default: 500
- types:
+ dataTypes:
out:
- mediaType: application/json
+ default: json
+ headers:
+ CamelAwsDdbStreamEventSource:
+ title: The DDB Stream Event Source
+ description: The Amazon Web Services service from which the stream
record originated. For DynamoDB Streams, this is aws:dynamodb.
+ type: string
+ CamelAwsDdbStreamEventId:
+ title: The DDB Stream Event Id
+ description: A globally unique identifier for the event that was
recorded in this stream record.
+ type: string
+ types:
+ json:
+ format: "application-json"
+ description: Default Json representation of a DDB Stream Event.
+ mediaType: application/json
+ cloudevents:
+ format: "aws2-ddbstream:application-cloudevents"
+ description: |-
+ Data type transformer converts AWS Dynamo DB Streams get records
response to CloudEvent v1_0 data format. The data
+ type sets Camel specific CloudEvent headers with values extracted
from AWS Dynamo DB Streams get records.
+ headers:
+ CamelCloudEventID:
+ title: CloudEvent ID
+ description: The Camel exchange id set as event id
+ type: string
+ CamelCloudEventType:
+ title: CloudEvent Type
+ description: The event type
+ default: "org.apache.camel.event.aws.ddbstream.getRecords"
+ type: string
+ CamelCloudEventSource:
+ title: CloudEvent Source
+ description: The event source. By default, the DDB Stream Event
source receipt handle with prefix "aws.ddbstream.".
+ type: string
+ CamelCloudEventSubject:
+ title: CloudEvent Subject
+ description: The event subject. The DDB Stream Event Id.
+ type: string
+ CamelCloudEventTime:
+ title: CloudEvent Time
+ description: The exchange creation timestamp as event time.
+ type: string
dependencies:
+ - mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.4.2-SNAPSHOT
- "camel:gson"
- "camel:aws2-ddb"
- "camel:kamelet"
@@ -109,7 +151,6 @@ spec:
overrideEndpoint: "{{overrideEndpoint}}"
delay: "{{delay}}"
steps:
- - marshal:
- json:
- library: Gson
+ - transform:
+ toType: "aws2-ddb:application-x-struct"
- to: "kamelet:sink"
diff --git
a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/amazonDDBClient.groovy
b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/amazonDDBClient.groovy
index 95e18b40..5c1d7ef1 100644
---
a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/amazonDDBClient.groovy
+++
b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/amazonDDBClient.groovy
@@ -25,6 +25,8 @@ import
software.amazon.awssdk.services.dynamodb.model.KeySchemaElement
import software.amazon.awssdk.services.dynamodb.model.KeyType
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType
+import software.amazon.awssdk.services.dynamodb.model.StreamSpecification
+import software.amazon.awssdk.services.dynamodb.model.StreamViewType
DynamoDbClient amazonDDBClient = DynamoDbClient
.builder()
@@ -45,6 +47,13 @@ amazonDDBClient.createTable(b -> {
b.attributeDefinitions(
AttributeDefinition.builder().attributeName("id").attributeType(ScalarAttributeType.N).build(),
)
+
+ if (${aws.ddb.streams}) {
+ b.streamSpecification(StreamSpecification.builder()
+ .streamEnabled(true)
+ .streamViewType(StreamViewType.NEW_AND_OLD_IMAGES).build())
+ }
+
b.provisionedThroughput(
ProvisionedThroughput.builder()
.readCapacityUnits(1L)
diff --git
a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-deleteItem.feature
b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-deleteItem.feature
index 7f45f1d0..e1a731b7 100644
---
a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-deleteItem.feature
+++
b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-deleteItem.feature
@@ -20,6 +20,7 @@ Feature: AWS DDB Sink - DeleteItem
Background:
Given variables
| timer.source.period | 10000 |
+ | aws.ddb.streams | false |
| aws.ddb.operation | DeleteItem |
| aws.ddb.tableName | movies |
| aws.ddb.item.id | 1 |
diff --git
a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-putItem.feature
b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-putItem.feature
index f9b889ac..a3261236 100644
---
a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-putItem.feature
+++
b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-putItem.feature
@@ -20,6 +20,7 @@ Feature: AWS DDB Sink - PutItem
Background:
Given variables
| timer.source.period | 10000 |
+ | aws.ddb.streams | false |
| aws.ddb.operation | PutItem |
| aws.ddb.tableName | movies |
| aws.ddb.item.id | 1 |
diff --git
a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-updateItem.feature
b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-updateItem.feature
index 956c5d1e..02b07d93 100644
---
a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-updateItem.feature
+++
b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-updateItem.feature
@@ -20,6 +20,7 @@ Feature: AWS DDB Sink - UpdateItem
Background:
Given variables
| timer.source.period | 10000 |
+ | aws.ddb.streams | false |
| aws.ddb.operation | UpdateItem |
| aws.ddb.tableName | movies |
| aws.ddb.item.id | 1 |
diff --git
a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-putItem.feature
b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-source-getItem.feature
similarity index 61%
copy from
tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-putItem.feature
copy to
tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-source-getItem.feature
index f9b889ac..51db8cf7 100644
---
a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-putItem.feature
+++
b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-source-getItem.feature
@@ -15,45 +15,37 @@
# limitations under the License.
# ---------------------------------------------------------------------------
-Feature: AWS DDB Sink - PutItem
+Feature: AWS DDB Source - GetItem
Background:
Given variables
+ | maxRetryAttempts | 20 |
| timer.source.period | 10000 |
- | aws.ddb.operation | PutItem |
+ | aws.ddb.streams | true |
| aws.ddb.tableName | movies |
| aws.ddb.item.id | 1 |
| aws.ddb.item.year | 1977 |
| aws.ddb.item.title | Star Wars IV |
- | aws.ddb.json.data | { "id":${aws.ddb.item.id},
"year":${aws.ddb.item.year}, "title":"${aws.ddb.item.title}" } |
Scenario: Create infrastructure
# Start LocalStack container
Given Enable service DYNAMODB
Given start LocalStack container
- Scenario: Verify AWS-DDB Kamelet sink binding
- Given variables
- | maxRetryAttempts | 20 |
- | aws.ddb.items | [] |
+ Scenario: Verify AWS-DDB Kamelet source binding
# Create AWS-DDB client
Given load to Camel registry amazonDDBClient.groovy
- # Verify empty items on AWS-DDB
- Then apply actions verifyItems.groovy
-
# Create binding
- When load Pipe aws-ddb-sink-pipe.yaml
- And Pipe aws-ddb-sink-pipe is available
- And Camel K integration aws-ddb-sink-pipe is running
- And Camel K integration aws-ddb-sink-pipe should print Started
aws-ddb-sink-pipe
- # Verify Kamelet sink
- Given variables
- | maxRetryAttempts | 20 |
- | aws.ddb.items | [[year:AttributeValue(N=${aws.ddb.item.year}),
id:AttributeValue(N=${aws.ddb.item.id}),
title:AttributeValue(S=${aws.ddb.item.title})]] |
- Then apply actions verifyItems.groovy
+ When load Pipe aws-ddb-source-pipe.yaml
+ And Pipe aws-ddb-source-pipe is available
+ And Camel K integration aws-ddb-source-pipe is running
+ And Camel K integration aws-ddb-source-pipe should print Started
aws-ddb-source-pipe
+ # Create item on AWS-DDB
+ Given run script putItem.groovy
+ And Camel K integration aws-ddb-source-pipe should print Star Wars IV
Scenario: Remove resources
# Remove Camel K binding
- Given delete Pipe aws-ddb-sink-pipe
+ Given delete Pipe aws-ddb-source-pipe
# Stop LocalStack container
Given stop LocalStack container
diff --git
a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-source-pipe.yaml
b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-source-pipe.yaml
new file mode 100644
index 00000000..3dccb80e
--- /dev/null
+++
b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-source-pipe.yaml
@@ -0,0 +1,42 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+apiVersion: camel.apache.org/v1
+kind: Pipe
+metadata:
+ name: aws-ddb-source-pipe
+spec:
+ source:
+ ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1
+ name: aws-ddb-streams-source
+ properties:
+ table: ${aws.ddb.tableName}
+ streamIteratorType: FROM_START
+ overrideEndpoint: true
+ uriEndpointOverride: ${YAKS_TESTCONTAINERS_LOCALSTACK_DYNAMODB_LOCAL_URL}
+ accessKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY}
+ secretKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY}
+ region: ${YAKS_TESTCONTAINERS_LOCALSTACK_REGION}
+ sink:
+ ref:
+ kind: Kamelet
+ apiVersion: camel.apache.org/v1
+ name: log-sink
+ properties:
+ showHeaders: true
diff --git
a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/yaks-config.yaml
b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/yaks-config.yaml
index dbd40ded..38c2609b 100644
--- a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/yaks-config.yaml
+++ b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/yaks-config.yaml
@@ -39,6 +39,7 @@ config:
- verifyItems.groovy
- amazonDDBClient.groovy
- aws-ddb-sink-pipe.yaml
+ - aws-ddb-source-pipe.yaml
cucumber:
tags:
- "not @ignored"
diff --git
a/tests/camel-kamelets-itest/src/test/resources/citrus-application.properties
b/tests/camel-kamelets-itest/src/test/resources/citrus-application.properties
index 5f768fab..80c270af 100644
---
a/tests/camel-kamelets-itest/src/test/resources/citrus-application.properties
+++
b/tests/camel-kamelets-itest/src/test/resources/citrus-application.properties
@@ -8,8 +8,8 @@ citrus.type.converter=camel
yaks.cluster.type=local
yaks.camelk.max.attempts=10
-yaks.jbang.camel.version=4.5.0
-yaks.jbang.kamelets.version=4.6.0-SNAPSHOT
+yaks.jbang.camel.version=4.4.1
+yaks.jbang.kamelets.version=4.4.2-SNAPSHOT
yaks.jbang.kamelets.local.dir=../../../kamelets
yaks.jbang.camel.dump.integration.output=true