This is an automated email from the ASF dual-hosted git repository. cdeppisch pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-kamelets.git
The following commit(s) were added to refs/heads/main by this push: new 51178428 Fix AWS DDB Streams Source Kamelet 51178428 is described below commit 51178428b06d4903dd24abeee19b43bb24509a0d Author: Christoph Deppisch <cdeppi...@redhat.com> AuthorDate: Mon Apr 29 11:06:45 2024 +0200 Fix AWS DDB Streams Source Kamelet - Use specific data type transformation to marshal the aws2-ddb record domain model to Json - Arbitrary Gson marshalling is not able to handle Java time instant type that is being used in aws2-ddb domain model - Add Java time instant Gson type adapter - Use Gson with timer instant type adapter in aws2-ddb Json Struct data type transformer - Add YAKS test to verify aws2-ddb-streams-source Kamelet --- kamelets/aws-ddb-streams-source.kamelet.yaml | 10 ++--- library/camel-kamelets-utils/pom.xml | 7 ++++ .../gson/JavaTimeInstantTypeAdapter.java | 44 +++++++++++++++++++++ .../ddb/Ddb2JsonStructDataTypeTransformer.java | 46 ++++++++++++++++++++++ .../transformer/aws2-ddb-application-x-struct | 1 + .../transformer/aws2-ddb-application-x-struct.json | 13 ++++++ .../kamelets/aws-ddb-streams-source.kamelet.yaml | 10 ++--- .../test/resources/aws/ddb/amazonDDBClient.groovy | 9 +++++ .../aws/ddb/aws-ddb-sink-deleteItem.feature | 1 + .../resources/aws/ddb/aws-ddb-sink-putItem.feature | 1 + .../aws/ddb/aws-ddb-sink-updateItem.feature | 1 + ...Item.feature => aws-ddb-source-getItem.feature} | 32 ++++++--------- .../resources/aws/ddb/aws-ddb-source-pipe.yaml | 42 ++++++++++++++++++++ .../src/test/resources/aws/ddb/yaks-config.yaml | 1 + 14 files changed, 188 insertions(+), 30 deletions(-) diff --git a/kamelets/aws-ddb-streams-source.kamelet.yaml b/kamelets/aws-ddb-streams-source.kamelet.yaml index ddd55c3d..0ac85298 100644 --- a/kamelets/aws-ddb-streams-source.kamelet.yaml +++ b/kamelets/aws-ddb-streams-source.kamelet.yaml @@ -67,7 +67,7 @@ spec: enum: ["ap-south-1", "eu-south-1", "us-gov-east-1", "me-central-1", "ca-central-1", "eu-central-1", "us-iso-west-1", "us-west-1", "us-west-2", "af-south-1", "eu-north-1", "eu-west-3", "eu-west-2", "eu-west-1", "ap-northeast-3", "ap-northeast-2", "ap-northeast-1", "me-south-1", "sa-east-1", "ap-east-1", "cn-north-1", "us-gov-west-1", "ap-southeast-1", "ap-southeast-2", "us-iso-east-1", "ap-southeast-3", "us-east-1", "us-east-2", "cn-northwest-1", "us-isob-east-1", "aws-global", "a [...] streamIteratorType: title: Stream Iterator Type - description: Defines where in the DynamoDB stream to start getting records. There are two enums and the value can be one of FROM_LATEST and FROM_START. Note that using FROM_START can cause a significant delay before the stream has caught up to real-time. + description: Defines where in the DynamoDB stream to start getting records. There are two enums and the value can be one of FROM_LATEST and FROM_START. Note that using FROM_START can cause a significant delay before the stream has caught up to real-time. type: string default: FROM_LATEST useDefaultCredentialsProvider: @@ -107,7 +107,7 @@ spec: description: Default Json representation of a DDB Stream Event. mediaType: application/json cloudevents: - format: "aws2-sqs:application-cloudevents" + format: "aws2-ddbstream:application-cloudevents" description: |- Data type transformer converts AWS Dynamo DB Streams get records response to CloudEvent v1_0 data format. The data type sets Camel specific CloudEvent headers with values extracted from AWS Dynamo DB Streams get records. @@ -134,6 +134,7 @@ spec: description: The exchange creation timestamp as event time. type: string dependencies: + - mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.6.0-SNAPSHOT - "camel:gson" - "camel:aws2-ddb" - "camel:kamelet" @@ -150,7 +151,6 @@ spec: overrideEndpoint: "{{overrideEndpoint}}" delay: "{{delay}}" steps: - - marshal: - json: - library: Gson + - transform: + toType: "aws2-ddb:application-x-struct" - to: "kamelet:sink" diff --git a/library/camel-kamelets-utils/pom.xml b/library/camel-kamelets-utils/pom.xml index 9499fe65..5363a839 100644 --- a/library/camel-kamelets-utils/pom.xml +++ b/library/camel-kamelets-utils/pom.xml @@ -71,6 +71,13 @@ <artifactId>camel-kafka</artifactId> </dependency> + <!-- Dependencies for Gson serialization type adapter --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-gson</artifactId> + <scope>provided</scope> + </dependency> + <!-- Dependencies for azure credential configuration --> <dependency> <groupId>org.apache.camel</groupId> diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/gson/JavaTimeInstantTypeAdapter.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/gson/JavaTimeInstantTypeAdapter.java new file mode 100644 index 00000000..eaeb5e7a --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/gson/JavaTimeInstantTypeAdapter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.serialization.gson; + +import java.lang.reflect.Type; +import java.time.Instant; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonParseException; +import com.google.gson.JsonPrimitive; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; + +public class JavaTimeInstantTypeAdapter implements JsonSerializer<Instant>, JsonDeserializer<Instant> { + + @Override + public JsonElement serialize(final Instant time, final Type typeOfSrc, + final JsonSerializationContext context) { + return new JsonPrimitive(time.getEpochSecond() * 1000); + } + + @Override + public Instant deserialize(final JsonElement json, final Type typeOfT, + final JsonDeserializationContext context) throws JsonParseException { + return Instant.ofEpochMilli(json.getAsLong()); + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws2/ddb/Ddb2JsonStructDataTypeTransformer.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws2/ddb/Ddb2JsonStructDataTypeTransformer.java new file mode 100644 index 00000000..0a204e0f --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws2/ddb/Ddb2JsonStructDataTypeTransformer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.transform.aws2.ddb; + +import java.time.Instant; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.apache.camel.Message; +import org.apache.camel.kamelets.utils.serialization.gson.JavaTimeInstantTypeAdapter; +import org.apache.camel.spi.DataType; +import org.apache.camel.spi.DataTypeTransformer; +import org.apache.camel.spi.Transformer; + +@DataTypeTransformer(name = "aws2-ddb:application-x-struct", + description = "Transforms DynamoDB record into a Json node") +public class Ddb2JsonStructDataTypeTransformer extends Transformer { + + private final Gson gson = new GsonBuilder() + .registerTypeAdapter(Instant.class, new JavaTimeInstantTypeAdapter()) + .create(); + + @Override + public void transform(Message message, DataType fromType, DataType toType) { + if (message.getBody() instanceof String) { + return; + } + + message.setBody(gson.toJson(message.getBody())); + } +} diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct new file mode 100644 index 00000000..32bb47c1 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct @@ -0,0 +1 @@ +class=org.apache.camel.kamelets.utils.transform.aws2.ddb.Ddb2JsonStructDataTypeTransformer diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct.json b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct.json new file mode 100644 index 00000000..c69ebae1 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/transformer/aws2-ddb-application-x-struct.json @@ -0,0 +1,13 @@ +{ + "transformer": { + "kind": "transformer", + "name": "aws2-ddb:application-x-struct", + "title": "Aws2 Ddb (Application Json Struct)", + "description": "Transforms DynamoDB record into a Json node", + "deprecated": false, + "javaType": "org.apache.camel.kamelets.utils.transform.aws2.ddb.Ddb2JsonStructDataTypeTransformer", + "groupId": "org.apache.camel", + "artifactId": "camel-aws2-ddb", + "version": "4.6.0-SNAPSHOT" + } +} diff --git a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml index ddd55c3d..0ac85298 100644 --- a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml +++ b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml @@ -67,7 +67,7 @@ spec: enum: ["ap-south-1", "eu-south-1", "us-gov-east-1", "me-central-1", "ca-central-1", "eu-central-1", "us-iso-west-1", "us-west-1", "us-west-2", "af-south-1", "eu-north-1", "eu-west-3", "eu-west-2", "eu-west-1", "ap-northeast-3", "ap-northeast-2", "ap-northeast-1", "me-south-1", "sa-east-1", "ap-east-1", "cn-north-1", "us-gov-west-1", "ap-southeast-1", "ap-southeast-2", "us-iso-east-1", "ap-southeast-3", "us-east-1", "us-east-2", "cn-northwest-1", "us-isob-east-1", "aws-global", "a [...] streamIteratorType: title: Stream Iterator Type - description: Defines where in the DynamoDB stream to start getting records. There are two enums and the value can be one of FROM_LATEST and FROM_START. Note that using FROM_START can cause a significant delay before the stream has caught up to real-time. + description: Defines where in the DynamoDB stream to start getting records. There are two enums and the value can be one of FROM_LATEST and FROM_START. Note that using FROM_START can cause a significant delay before the stream has caught up to real-time. type: string default: FROM_LATEST useDefaultCredentialsProvider: @@ -107,7 +107,7 @@ spec: description: Default Json representation of a DDB Stream Event. mediaType: application/json cloudevents: - format: "aws2-sqs:application-cloudevents" + format: "aws2-ddbstream:application-cloudevents" description: |- Data type transformer converts AWS Dynamo DB Streams get records response to CloudEvent v1_0 data format. The data type sets Camel specific CloudEvent headers with values extracted from AWS Dynamo DB Streams get records. @@ -134,6 +134,7 @@ spec: description: The exchange creation timestamp as event time. type: string dependencies: + - mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.6.0-SNAPSHOT - "camel:gson" - "camel:aws2-ddb" - "camel:kamelet" @@ -150,7 +151,6 @@ spec: overrideEndpoint: "{{overrideEndpoint}}" delay: "{{delay}}" steps: - - marshal: - json: - library: Gson + - transform: + toType: "aws2-ddb:application-x-struct" - to: "kamelet:sink" diff --git a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/amazonDDBClient.groovy b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/amazonDDBClient.groovy index 95e18b40..5c1d7ef1 100644 --- a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/amazonDDBClient.groovy +++ b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/amazonDDBClient.groovy @@ -25,6 +25,8 @@ import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement import software.amazon.awssdk.services.dynamodb.model.KeyType import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType +import software.amazon.awssdk.services.dynamodb.model.StreamSpecification +import software.amazon.awssdk.services.dynamodb.model.StreamViewType DynamoDbClient amazonDDBClient = DynamoDbClient .builder() @@ -45,6 +47,13 @@ amazonDDBClient.createTable(b -> { b.attributeDefinitions( AttributeDefinition.builder().attributeName("id").attributeType(ScalarAttributeType.N).build(), ) + + if (${aws.ddb.streams}) { + b.streamSpecification(StreamSpecification.builder() + .streamEnabled(true) + .streamViewType(StreamViewType.NEW_AND_OLD_IMAGES).build()) + } + b.provisionedThroughput( ProvisionedThroughput.builder() .readCapacityUnits(1L) diff --git a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-deleteItem.feature b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-deleteItem.feature index 7f45f1d0..e1a731b7 100644 --- a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-deleteItem.feature +++ b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-deleteItem.feature @@ -20,6 +20,7 @@ Feature: AWS DDB Sink - DeleteItem Background: Given variables | timer.source.period | 10000 | + | aws.ddb.streams | false | | aws.ddb.operation | DeleteItem | | aws.ddb.tableName | movies | | aws.ddb.item.id | 1 | diff --git a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-putItem.feature b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-putItem.feature index f9b889ac..a3261236 100644 --- a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-putItem.feature +++ b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-putItem.feature @@ -20,6 +20,7 @@ Feature: AWS DDB Sink - PutItem Background: Given variables | timer.source.period | 10000 | + | aws.ddb.streams | false | | aws.ddb.operation | PutItem | | aws.ddb.tableName | movies | | aws.ddb.item.id | 1 | diff --git a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-updateItem.feature b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-updateItem.feature index 956c5d1e..02b07d93 100644 --- a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-updateItem.feature +++ b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-updateItem.feature @@ -20,6 +20,7 @@ Feature: AWS DDB Sink - UpdateItem Background: Given variables | timer.source.period | 10000 | + | aws.ddb.streams | false | | aws.ddb.operation | UpdateItem | | aws.ddb.tableName | movies | | aws.ddb.item.id | 1 | diff --git a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-putItem.feature b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-source-getItem.feature similarity index 61% copy from tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-putItem.feature copy to tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-source-getItem.feature index f9b889ac..51db8cf7 100644 --- a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-sink-putItem.feature +++ b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-source-getItem.feature @@ -15,45 +15,37 @@ # limitations under the License. # --------------------------------------------------------------------------- -Feature: AWS DDB Sink - PutItem +Feature: AWS DDB Source - GetItem Background: Given variables + | maxRetryAttempts | 20 | | timer.source.period | 10000 | - | aws.ddb.operation | PutItem | + | aws.ddb.streams | true | | aws.ddb.tableName | movies | | aws.ddb.item.id | 1 | | aws.ddb.item.year | 1977 | | aws.ddb.item.title | Star Wars IV | - | aws.ddb.json.data | { "id":${aws.ddb.item.id}, "year":${aws.ddb.item.year}, "title":"${aws.ddb.item.title}" } | Scenario: Create infrastructure # Start LocalStack container Given Enable service DYNAMODB Given start LocalStack container - Scenario: Verify AWS-DDB Kamelet sink binding - Given variables - | maxRetryAttempts | 20 | - | aws.ddb.items | [] | + Scenario: Verify AWS-DDB Kamelet source binding # Create AWS-DDB client Given load to Camel registry amazonDDBClient.groovy - # Verify empty items on AWS-DDB - Then apply actions verifyItems.groovy - # Create binding - When load Pipe aws-ddb-sink-pipe.yaml - And Pipe aws-ddb-sink-pipe is available - And Camel K integration aws-ddb-sink-pipe is running - And Camel K integration aws-ddb-sink-pipe should print Started aws-ddb-sink-pipe - # Verify Kamelet sink - Given variables - | maxRetryAttempts | 20 | - | aws.ddb.items | [[year:AttributeValue(N=${aws.ddb.item.year}), id:AttributeValue(N=${aws.ddb.item.id}), title:AttributeValue(S=${aws.ddb.item.title})]] | - Then apply actions verifyItems.groovy + When load Pipe aws-ddb-source-pipe.yaml + And Pipe aws-ddb-source-pipe is available + And Camel K integration aws-ddb-source-pipe is running + And Camel K integration aws-ddb-source-pipe should print Started aws-ddb-source-pipe + # Create item on AWS-DDB + Given run script putItem.groovy + And Camel K integration aws-ddb-source-pipe should print Star Wars IV Scenario: Remove resources # Remove Camel K binding - Given delete Pipe aws-ddb-sink-pipe + Given delete Pipe aws-ddb-source-pipe # Stop LocalStack container Given stop LocalStack container diff --git a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-source-pipe.yaml b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-source-pipe.yaml new file mode 100644 index 00000000..3dccb80e --- /dev/null +++ b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/aws-ddb-source-pipe.yaml @@ -0,0 +1,42 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: aws-ddb-source-pipe +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: aws-ddb-streams-source + properties: + table: ${aws.ddb.tableName} + streamIteratorType: FROM_START + overrideEndpoint: true + uriEndpointOverride: ${YAKS_TESTCONTAINERS_LOCALSTACK_DYNAMODB_LOCAL_URL} + accessKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY} + secretKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY} + region: ${YAKS_TESTCONTAINERS_LOCALSTACK_REGION} + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink + properties: + showHeaders: true diff --git a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/yaks-config.yaml b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/yaks-config.yaml index dbd40ded..38c2609b 100644 --- a/tests/camel-kamelets-itest/src/test/resources/aws/ddb/yaks-config.yaml +++ b/tests/camel-kamelets-itest/src/test/resources/aws/ddb/yaks-config.yaml @@ -39,6 +39,7 @@ config: - verifyItems.groovy - amazonDDBClient.groovy - aws-ddb-sink-pipe.yaml + - aws-ddb-source-pipe.yaml cucumber: tags: - "not @ignored"