This is an automated email from the ASF dual-hosted git repository.
ppalaga pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push:
new 5a18c7d Intermittent failures in Aws2SqsSnsIT fix #2741
5a18c7d is described below
commit 5a18c7d3dadcfa0148f4a5236a4ebd0addb83b74
Author: Peter Palaga <[email protected]>
AuthorDate: Tue Jun 8 10:27:08 2021 +0200
Intermittent failures in Aws2SqsSnsIT fix #2741
---
.../aws2/ddb/it/Aws2DdbStreamResource.java | 37 ++---------
.../component/aws2/ddb/it/Aws2DdbStreamRoutes.java | 72 ++++++++++++++++++++++
.../quarkus/component/aws2/ddb/it/Aws2DdbTest.java | 40 +++++++-----
3 files changed, 101 insertions(+), 48 deletions(-)
diff --git
a/integration-test-groups/aws2/aws2-ddb/src/main/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbStreamResource.java
b/integration-test-groups/aws2/aws2-ddb/src/main/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbStreamResource.java
index 654f121..bb41701 100644
---
a/integration-test-groups/aws2/aws2-ddb/src/main/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbStreamResource.java
+++
b/integration-test-groups/aws2/aws2-ddb/src/main/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbStreamResource.java
@@ -16,22 +16,18 @@
*/
package org.apache.camel.quarkus.component.aws2.ddb.it;
-import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import javax.enterprise.context.ApplicationScoped;
-import javax.enterprise.event.Observes;
import javax.inject.Inject;
+import javax.inject.Named;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
-import io.quarkus.runtime.StartupEvent;
-import org.apache.camel.ConsumerTemplate;
import org.eclipse.microprofile.config.inject.ConfigProperty;
-import software.amazon.awssdk.services.dynamodb.model.Record;
-import software.amazon.awssdk.services.dynamodb.model.StreamRecord;
@Path("/aws2-ddbstream")
@ApplicationScoped
@@ -41,35 +37,14 @@ public class Aws2DdbStreamResource {
String tableName;
@Inject
- ConsumerTemplate consumerTemplate;
-
- void startup(@Observes StartupEvent event) {
- /* Hit the consumer URI at application startup so that the consumer
starts polling eagerly */
- consumerTemplate.receiveBody(componentUri(), 1000);
- }
+ @Named("aws2DdbStreamReceivedEvents")
+ List<Map<String, String>> aws2DdbStreamReceivedEvents;
@Path("/change")
@GET
@Produces(MediaType.APPLICATION_JSON)
- public Map<String, String> change() {
- Map<String, String> result = new LinkedHashMap<>();
- Record record = consumerTemplate.receiveBody(componentUri(), 10000,
Record.class);
- if (record == null) {
- return null;
- }
- StreamRecord item = record.dynamodb();
- result.put("key", item.keys().get("key").s());
- if (item.hasOldImage()) {
- result.put("old", item.oldImage().get("value").s());
- }
- if (item.hasNewImage()) {
- result.put("new", item.newImage().get("value").s());
- }
- return result;
- }
-
- private String componentUri() {
- return "aws2-ddbstream://" + tableName;
+ public List<Map<String, String>> change() {
+ return aws2DdbStreamReceivedEvents;
}
}
diff --git
a/integration-test-groups/aws2/aws2-ddb/src/main/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbStreamRoutes.java
b/integration-test-groups/aws2/aws2-ddb/src/main/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbStreamRoutes.java
new file mode 100644
index 0000000..41a4768
--- /dev/null
+++
b/integration-test-groups/aws2/aws2-ddb/src/main/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbStreamRoutes.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.aws2.ddb.it;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+import javax.inject.Named;
+import javax.inject.Singleton;
+import javax.ws.rs.Produces;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import software.amazon.awssdk.services.dynamodb.model.Record;
+import software.amazon.awssdk.services.dynamodb.model.StreamRecord;
+
+@ApplicationScoped
+public class Aws2DdbStreamRoutes extends RouteBuilder {
+
+ @ConfigProperty(name = "aws-ddb.table-name")
+ String tableName;
+
+ @Inject
+ @Named("aws2DdbStreamReceivedEvents")
+ List<Map<String, String>> aws2DdbStreamReceivedEvents;
+
+ @Override
+ public void configure() throws Exception {
+ from("aws2-ddbstream://" + tableName)
+ .process(e -> {
+ Record record = e.getMessage().getBody(Record.class);
+ StreamRecord item = record.dynamodb();
+ Map<String, String> result = new LinkedHashMap<>();
+ result.put("key", item.keys().get("key").s());
+ if (item.hasOldImage()) {
+ result.put("old", item.oldImage().get("value").s());
+ }
+ if (item.hasNewImage()) {
+ result.put("new", item.newImage().get("value").s());
+ }
+ aws2DdbStreamReceivedEvents.add(result);
+ });
+ }
+
+ static class Producers {
+ @Singleton
+ @javax.enterprise.inject.Produces
+ @Named("aws2DdbStreamReceivedEvents")
+ List<Map<String, String>> aws2DdbStreamReceivedEvents() {
+ return new CopyOnWriteArrayList<>();
+ }
+ }
+
+}
diff --git
a/integration-test-groups/aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTest.java
b/integration-test-groups/aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTest.java
index 2ebf6d8..46890c4 100644
---
a/integration-test-groups/aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTest.java
+++
b/integration-test-groups/aws2/aws2-ddb/src/test/java/org/apache/camel/quarkus/component/aws2/ddb/it/Aws2DdbTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.quarkus.component.aws2.ddb.it;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -101,23 +102,28 @@ class Aws2DdbTest {
},
Matchers.is(204));
- /* The above actions should trigger the following three change events
*/
- RestAssured.get("/aws2-ddbstream/change")
- .then()
- .statusCode(200)
- .body("key", Matchers.is(key))
- .body("new", Matchers.is(msg));
- RestAssured.get("/aws2-ddbstream/change")
- .then()
- .statusCode(200)
- .body("key", Matchers.is(key))
- .body("old", Matchers.is(msg))
- .body("new", Matchers.is(newMsg));
- RestAssured.get("/aws2-ddbstream/change")
- .then()
- .statusCode(200)
- .body("key", Matchers.is(key))
- .body("old", Matchers.is(newMsg));
+ Awaitility.await().atMost(120, TimeUnit.SECONDS).until(
+ () -> {
+ ExtractableResponse<Response> result =
RestAssured.get("/aws2-ddbstream/change")
+ .then()
+ .statusCode(200)
+ .extract();
+
+ LOG.info("Expecting 3 events got " + result.statusCode() +
": " + result.body().asString());
+ return result.jsonPath().getList("$", Map.class);
+ },
+ /* The above actions should trigger the following three change
events */
+ list -> list.size() == 3
+
+ && key.equals(list.get(0).get("key"))
+ && msg.equals(list.get(0).get("new"))
+
+ && key.equals(list.get(1).get("key"))
+ && msg.equals(list.get(1).get("old"))
+ && newMsg.equals(list.get(1).get("new"))
+
+ && key.equals(list.get(2).get("key"))
+ && newMsg.equals(list.get(2).get("old")));
}