This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch 3.8.x
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/3.8.x by this push:
new 74b847a350 Jt400: tests are not cleaning after themselves and parallel
run fails
74b847a350 is described below
commit 74b847a350c7bd018e4f4e881bfa2d61523d13eb
Author: JiriOndrusek <[email protected]>
AuthorDate: Mon Apr 15 13:07:33 2024 +0200
Jt400: tests are not cleaning after themselves and parallel run fails
---
integration-tests/jt400/README.adoc | 24 +-
integration-tests/jt400/pom.xml | 5 +
.../component/jt400/it/InquiryMessageHolder.java | 43 +++
.../quarkus/component/jt400/it/Jt400Resource.java | 97 ++++--
.../quarkus/component/jt400/it/Jt400Routes.java | 22 +-
.../src/main/resources/application.properties | 2 +-
.../quarkus/component/jt400/it/Jt400Test.java | 245 +++++++-------
.../component/jt400/it/Jt400TestResource.java | 352 +++++++++++++++++++++
8 files changed, 645 insertions(+), 145 deletions(-)
diff --git a/integration-tests/jt400/README.adoc
b/integration-tests/jt400/README.adoc
index 0fa4e22359..241dbf8c77 100644
--- a/integration-tests/jt400/README.adoc
+++ b/integration-tests/jt400/README.adoc
@@ -110,4 +110,26 @@ $Env:JT400_KEYED_QUEUE="#lkeyedqueue_if_not_TESTKEYED.DTAQ"
$Env:JT400_MESSAGE_QUEUE="#messagequeue_if_not_TESTMSGQ.MSGQ"
$Env:JT400_MESSAGE_REPLYTO_QUEUE="#messagequeueinquiry_if_not_REPLYMSGQ.MSGQ"
$Env:JT400_USER_SPACE="#userspace_if_not_PROGCALL"
-```
\ No newline at end of file
+```
+
+=== Clear queues after unexpected failures
+
+If tests finishes without unexpected failure, tests are taking care of
clearing the data.
+In some cases data might stay written into the real server if test fails
unexpectedly.
+This state should might alter following executions.
+
+To force full clear (of each queue) can be achieved by add ing parameter
+```
+-Dcq.jt400.clear-all=true
+```
+Be aware that with `-Dcq.jt400.clear-all=true`, the tests can not successfully
finish in parallel run.
+
+Usage of clear queues parameter is *strongly* suggested during development
+
+
+==== Parallel runs and locking
+
+Simple locking mechanism is implemented for the test to allow parallel
executions.
+
+Whenever test is started, new entry is written into keyed data queue
`JT400_KEYED_QUEUE` with the key `cq.jt400.global-lock` and entry is removed
after the run.
+Tests are able to clear this lock even if previous execution fails
unexpectedly.
\ No newline at end of file
diff --git a/integration-tests/jt400/pom.xml b/integration-tests/jt400/pom.xml
index 8d8f352c70..22e1efd5fe 100644
--- a/integration-tests/jt400/pom.xml
+++ b/integration-tests/jt400/pom.xml
@@ -71,6 +71,11 @@
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
diff --git
a/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/InquiryMessageHolder.java
b/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/InquiryMessageHolder.java
new file mode 100644
index 0000000000..18030cc163
--- /dev/null
+++
b/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/InquiryMessageHolder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.jt400.it;
+
+import jakarta.inject.Singleton;
+
+@Singleton
+public class InquiryMessageHolder {
+
+ private String messageText;
+
+ private boolean processed = false;
+
+ public String getMessageText() {
+ return messageText;
+ }
+
+ public void setMessageText(String messageText) {
+ this.messageText = messageText;
+ }
+
+ public boolean isProcessed() {
+ return processed;
+ }
+
+ public void setProcessed(boolean processed) {
+ this.processed = processed;
+ }
+}
diff --git
a/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Resource.java
b/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Resource.java
index c65f268978..9fd65df78c 100644
---
a/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Resource.java
+++
b/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Resource.java
@@ -16,19 +16,19 @@
*/
package org.apache.camel.quarkus.component.jt400.it;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import com.ibm.as400.access.AS400;
-import com.ibm.as400.access.MessageQueue;
import com.ibm.as400.access.QueuedMessage;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
+import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
+import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;
@@ -80,6 +80,9 @@ public class Jt400Resource {
@Inject
CamelContext context;
+ @Inject
+ InquiryMessageHolder inquiryMessageHolder;
+
@Path("/dataQueue/read/")
@POST
@Produces(MediaType.APPLICATION_JSON)
@@ -101,7 +104,7 @@ public class Jt400Resource {
Exchange ex =
consumerTemplate.receive(getUrlForLibrary(suffix.toString()));
if ("binary".equals(format)) {
- return generateResponse(new
String(ex.getIn().getBody(byte[].class), Charset.forName("Cp037")), ex);
+ return generateResponse(new
String(ex.getIn().getBody(byte[].class), StandardCharsets.UTF_8), ex);
}
return generateResponse(ex.getIn().getBody(String.class), ex);
@@ -112,61 +115,85 @@ public class Jt400Resource {
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Response keyedDataQueueWrite(@QueryParam("key") String key,
- @QueryParam("searchType") String searchType,
+ @QueryParam("format") String format,
String data) {
+ String _format = Optional.ofNullable(format).orElse("text");
boolean keyed = key != null;
StringBuilder suffix = new StringBuilder();
Map<String, Object> headers = new HashMap<>();
+ String msg;
if (keyed) {
- suffix.append(jt400KeyedQueue).append("?keyed=true");
+
suffix.append(jt400KeyedQueue).append("?keyed=true").append("&format=").append(_format);
headers.put(Jt400Endpoint.KEY, key);
+ msg = "Hello From KDQ: " + data;
} else {
- suffix.append(jt400LifoQueue);
+ suffix.append(jt400LifoQueue).append("?format=").append(_format);
+ msg = "Hello From DQ: " + data;
}
- Object ex = producerTemplate.requestBodyAndHeaders(
- getUrlForLibrary(suffix.toString()),
- "Hello " + data,
- headers);
- return Response.ok().entity(ex).build();
+ Object retVal;
+ if ("binary".equals(format)) {
+ byte[] result = (byte[]) producerTemplate.requestBodyAndHeaders(
+ getUrlForLibrary(suffix.toString()),
+ ("Hello (bin) " + data).getBytes(StandardCharsets.UTF_8),
+ headers);
+ retVal = new String(result, StandardCharsets.UTF_8);
+ } else {
+ retVal = producerTemplate.requestBodyAndHeaders(
+ getUrlForLibrary(suffix.toString()),
+ msg,
+ headers);
+ }
+
+ return Response.ok().entity(retVal).build();
}
- @Path("/client/inquiryMessage/write/")
- @POST
+ @Path("/route/start/{route}")
+ @GET
@Produces(MediaType.TEXT_PLAIN)
- public Response clientInquiryMessageWrite(String data) throws Exception {
- Jt400Endpoint jt400Endpoint =
context.getEndpoint(getUrlForLibrary(jt400MessageReplyToQueue),
Jt400Endpoint.class);
- AS400 as400 = jt400Endpoint.getConfiguration().getConnection();
- //send inquiry message (with the same client as is used in the
component, to avoid `CPF2451 Message queue TESTMSGQ is allocated to another
job`.
- MessageQueue queue = new MessageQueue(as400,
jt400Endpoint.getConfiguration().getObjectPath());
- try {
- queue.sendInquiry(data, "/QSYS.LIB/" + jt400Library + ".LIB/" +
jt400MessageReplyToQueue);
- } catch (Exception e) {
- return Response.status(500).entity(e.getMessage()).build();
+ public Response startRoute(@PathParam("route") String routeName) throws
Exception {
+ if
(context.getRouteController().getRouteStatus(routeName).isStartable()) {
+ context.getRouteController().startRoute(routeName);
}
- return Response.ok().build();
+
+ return
Response.ok().entity(context.getRouteController().getRouteStatus(routeName).isStarted()).build();
}
- @Path("/client/queuedMessage/read")
- @POST
+ @Path("/route/stop/{route}")
+ @GET
@Produces(MediaType.TEXT_PLAIN)
- public Response clientQueuedMessageRead(String queueName) throws Exception
{
+ public Response stopRoute(@PathParam("route") String routeName) throws
Exception {
+ if
(context.getRouteController().getRouteStatus(routeName).isStoppable()) {
+ context.getRouteController().stopRoute(routeName);
+ }
+ boolean resp =
context.getRouteController().getRouteStatus(routeName).isStopped();
+
+ //stop component to avoid CPF2451 Message queue REPLYMSGQ is allocated
to another job.
+ Jt400Endpoint jt400Endpoint =
context.getEndpoint(getUrlForLibrary(jt400MessageReplyToQueue),
Jt400Endpoint.class);
+ jt400Endpoint.close();
- Jt400Endpoint jt400Endpoint =
context.getEndpoint(getUrlForLibrary(queueName), Jt400Endpoint.class);
- AS400 as400 = jt400Endpoint.getConfiguration().getConnection();
- //send inquiry message (with the same client as is used in the
component, to avoid `CPF2451 Message queue TESTMSGQ is allocated to another
job`.
- MessageQueue queue = new MessageQueue(as400,
jt400Endpoint.getConfiguration().getObjectPath());
- QueuedMessage message = queue.receive(null);
+ return Response.ok().entity(resp).build();
+ }
- return Response.ok().entity(message != null ? message.getText() :
"").build();
+ @Path("/inquiryMessageSetExpected")
+ @POST
+ public void inquiryMessageSetExpected(String msg) {
+ inquiryMessageHolder.setMessageText(msg);
+ }
+
+ @Path("/inquiryMessageProcessed")
+ @GET
+ @Produces(MediaType.TEXT_PLAIN)
+ public String inquiryMessageProcessed() {
+ return String.valueOf(inquiryMessageHolder.isProcessed());
}
@Path("/messageQueue/write/")
@POST
@Produces(MediaType.TEXT_PLAIN)
public Response messageQueueWrite(String data) {
- Object ex =
producerTemplate.requestBody(getUrlForLibrary(jt400MessageQueue), "Hello " +
data);
+ Object ex =
producerTemplate.requestBody(getUrlForLibrary(jt400MessageQueue), "Hello from
MQ: " + data);
return Response.ok().entity(ex).build();
}
@@ -176,7 +203,7 @@ public class Jt400Resource {
@Produces(MediaType.APPLICATION_JSON)
public Response messageQueueRead(@QueryParam("queue") String queue) {
Exchange ex = consumerTemplate
- .receive(getUrlForLibrary(queue == null ? jt400MessageQueue :
queue));
+ .receive(getUrlForLibrary(queue == null ? jt400MessageQueue :
queue) + "?messageAction=SAME");
return generateResponse(ex.getIn().getBody(String.class), ex);
}
diff --git
a/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Routes.java
b/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Routes.java
index 5615454c86..a3ce493919 100644
---
a/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Routes.java
+++
b/integration-tests/jt400/src/main/java/org/apache/camel/quarkus/component/jt400/it/Jt400Routes.java
@@ -18,12 +18,15 @@ package org.apache.camel.quarkus.component.jt400.it;
import com.ibm.as400.access.AS400Message;
import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jt400.Jt400Constants;
import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.jboss.logging.Logger;
@ApplicationScoped
public class Jt400Routes extends RouteBuilder {
+ private static final Logger LOGGER = Logger.getLogger(Jt400Routes.class);
@ConfigProperty(name = "cq.jt400.library")
String jt400Library;
@@ -40,16 +43,31 @@ public class Jt400Routes extends RouteBuilder {
@ConfigProperty(name = "cq.jt400.message-replyto-queue")
String jt400MessageReplyToQueue;
+ @Inject
+ InquiryMessageHolder inquiryMessageHolder;
+
@Override
public void configure() throws Exception {
from(getUrlForLibrary(jt400MessageReplyToQueue + "?sendingReply=true"))
+ .id("inquiryRoute")
+ //route has tobe stopped to avoid "CPF2451 Message queue
REPLYMSGQ is allocated to another job."
+ .autoStartup(false)
.choice()
.when(header(Jt400Constants.MESSAGE_TYPE).isEqualTo(AS400Message.INQUIRY))
.process((exchange) -> {
- String reply = "reply to: " +
exchange.getIn().getBody(String.class);
+ String msg = exchange.getIn().getBody(String.class);
+ LOGGER.debug(
+ "Inquiry route: received '" + msg + "' (expecting
'" + inquiryMessageHolder.getMessageText()
+ + "')");
+ if (inquiryMessageHolder.getMessageText() != null &&
!inquiryMessageHolder.getMessageText().equals(msg)) {
+ throw new IllegalStateException(
+ "Intentional! Current exchange is not
triggered by current test process, therefore ignoring the exchange");
+ }
+ String reply = "reply to: " + msg;
exchange.getIn().setBody(reply);
})
- .to(getUrlForLibrary(jt400MessageReplyToQueue));
+ .to(getUrlForLibrary(jt400MessageReplyToQueue))
+ .process(e -> inquiryMessageHolder.setProcessed(true));
}
private String getUrlForLibrary(String suffix) {
diff --git a/integration-tests/jt400/src/main/resources/application.properties
b/integration-tests/jt400/src/main/resources/application.properties
index db47b32aab..296c10e1a0 100644
--- a/integration-tests/jt400/src/main/resources/application.properties
+++ b/integration-tests/jt400/src/main/resources/application.properties
@@ -31,4 +31,4 @@ cq.jt400.user-space=${JT400_USER_SPACE:PROGCALL}
cq.jt400.message-queue=${JT400_MESSAGE_QUEUE:TESTMSGQ.MSGQ}
cq.jt400.message-replyto-queue=${JT400_MESSAGE_REPLYTO_QUEUE:REPLYMSGQ.MSGQ}
cq.jt400.keyed-queue=${JT400_KEYED_QUEUE:TESTKEYED.DTAQ}
-cq.jt400.lifo-queue=${JT400_LIFO_QUEUE:TESTLIFO.DTAQ}
+cq.jt400.lifo-queue=${JT400_LIFO_QUEUE:TESTLIFO.DTAQ}
\ No newline at end of file
diff --git
a/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java
b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java
index c1168b7644..3627dd198a 100644
---
a/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java
+++
b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400Test.java
@@ -16,117 +16,117 @@
*/
package org.apache.camel.quarkus.component.jt400.it;
-import java.io.IOException;
import java.util.Locale;
-import java.util.function.BiFunction;
-
-import com.ibm.as400.access.AS400;
-import com.ibm.as400.access.AS400SecurityException;
-import com.ibm.as400.access.ErrorCompletingRequestException;
-import com.ibm.as400.access.KeyedDataQueue;
-import com.ibm.as400.access.MessageQueue;
-import com.ibm.as400.access.ObjectDoesNotExistException;
+import java.util.concurrent.TimeUnit;
+
+import com.ibm.as400.access.QueuedMessage;
+import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import org.apache.camel.component.jt400.Jt400Constants;
import org.apache.commons.lang3.RandomStringUtils;
-import org.eclipse.microprofile.config.ConfigProvider;
+import org.awaitility.Awaitility;
import org.hamcrest.Matchers;
+import org.jboss.logging.Logger;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
@QuarkusTest
@EnabledIfEnvironmentVariable(named = "JT400_URL", matches = ".+")
+@QuarkusTestResource(Jt400TestResource.class)
public class Jt400Test {
+ private static final Logger LOGGER = Logger.getLogger(Jt400Test.class);
+
+ private final int MSG_LENGTH = 20;
+ //tests may be executed in parallel, therefore the timeout is a little
bigger in case the test has to wait for another one
+ private final int WAIT_IN_SECONDS = 20;
@BeforeAll
public static void beforeAll() throws Exception {
- //read all messages from the queues to be sure that they are empty
-
- //clear reply-to message queue
- clearQueue("cq.jt400.message-replyto-queue",
- (as400, path) -> {
- try {
- return new MessageQueue(as400, path).receive(null);
- } catch (Exception e) {
- return null;
- }
- });
-
- //clear message queue
- clearQueue("cq.jt400.message-queue",
- (as400, path) -> {
- try {
- return new MessageQueue(as400, path).receive(null);
- } catch (Exception e) {
- return null;
- }
- });
-
- //clear keyed queue for key1
- clearQueue("cq.jt400.message-queue",
- (as400, path) -> {
- try {
- return new KeyedDataQueue(as400, path).read("key1");
- } catch (Exception e) {
- return null;
- }
- });
-
- //clear keyed queue for key2
- clearQueue("cq.jt400.message-queue",
- (as400, path) -> {
- try {
- return new KeyedDataQueue(as400, path).read("key1");
- } catch (Exception e) {
- return null;
- }
- });
+ //for development purposes
+ // logQueues();
+
+ //lock execution
+ Jt400TestResource.CLIENT_HELPER.lock();
+ }
+
+ @AfterAll
+ public static void afterAll() throws Exception {
+ getClientHelper().unlock();
+ }
+
+ private static void logQueues() throws Exception {
+ StringBuilder sb = new StringBuilder("\n");
+
sb.append("**********************************************************");
+ sb.append(getClientHelper().dumpQueues());
+
sb.append("\n**********************************************************\n");
+ LOGGER.info(sb.toString());
}
@Test
public void testDataQueue() {
- String msg =
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT);
+ LOGGER.debug("** testDataQueue() ** has started ");
+
+ String msg =
RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT);
+ String answer = "Hello From DQ: " + msg;
RestAssured.given()
.body(msg)
.post("/jt400/dataQueue/write")
.then()
.statusCode(200)
- .body(Matchers.equalTo("Hello " + msg));
+ .body(Matchers.equalTo(answer));
+
+ LOGGER.debug("testDataQueue: message '" + answer + "' was written. ");
+
+
getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.lifoQueueu,
answer);
RestAssured.post("/jt400/dataQueue/read")
.then()
.statusCode(200)
- .body("result", Matchers.equalTo("Hello " + msg));
+ .body("result", Matchers.equalTo(answer));
}
@Test
- public void testDataQueueBinary() {
- String msg =
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT);
+ public void testDataQueueBinary() throws Exception {
+ LOGGER.debug("** testDataQueueBinary() ** has started ");
+ String msg =
RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT);
+ String answer = "Hello (bin) " + msg;
RestAssured.given()
.body(msg)
+ .queryParam("format", "binary")
.post("/jt400/dataQueue/write")
.then()
.statusCode(200)
- .body(Matchers.equalTo("Hello " + msg));
+ .body(Matchers.equalTo(answer));
+
+ LOGGER.debug("testDataQueueBinary: message '" + answer + "' was
written. ");
+
+ //register to delete
+
getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.lifoQueueu,
answer);
RestAssured.given()
.queryParam("format", "binary")
.post("/jt400/dataQueue/read")
.then()
.statusCode(200)
- .body("result", Matchers.equalTo("Hello " + msg));
+ .body("result", Matchers.equalTo(answer));
}
@Test
public void testKeyedDataQueue() {
- String msg1 =
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT);
- String msg2 =
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT);
- String key1 = "key1";
- String key2 = "key2";
+ LOGGER.debug("** testKeyedDataQueue() ** has started ");
+ String msg1 =
RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT);
+ String msg2 =
RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT);
+ String answer1 = "Hello From KDQ: " + msg1;
+ String answer2 = "Hello From KDQ: " + msg2;
+
+ String key1 = RandomStringUtils.randomAlphanumeric(MSG_LENGTH -
1).toLowerCase(Locale.ROOT);
+ //key2 is right after key1
+ String key2 = key1 + "a";
RestAssured.given()
.body(msg1)
@@ -134,79 +134,131 @@ public class Jt400Test {
.post("/jt400/dataQueue/write/")
.then()
.statusCode(200)
- .body(Matchers.equalTo("Hello " + msg1));
+ .body(Matchers.equalTo(answer1));
+
+ LOGGER.debug("testKeyedDataQueue: message '" + answer1 + " (key " +
key1 + ") was written. ");
+
getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.keyedDataQue,
key1);
RestAssured.given()
- .body("Sheldon2")
+ .body(msg2)
.queryParam("key", key2)
.post("/jt400/dataQueue/write/")
.then()
.statusCode(200)
- .body(Matchers.equalTo("Hello Sheldon2"));
+ .body(Matchers.equalTo(answer2));
+
+ LOGGER.debug("testKeyedDataQueue: message '" + answer2 + " (key " +
key2 + ") was written. ");
+
getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.keyedDataQue,
key2);
RestAssured.given()
.body(key1)
.post("/jt400/dataQueue/read/")
.then()
.statusCode(200)
- .body("result", Matchers.equalTo("Hello " + msg1))
+ .body("result", Matchers.equalTo(answer1))
.body(Jt400Constants.KEY, Matchers.equalTo(key1));
RestAssured.given()
.body(key1)
- .queryParam("searchType", "NE")
+ .queryParam("searchType", "GE")
.post("/jt400/dataQueue/read/")
.then()
.statusCode(200)
- .body("result", Matchers.not(Matchers.equalTo("Hello " +
msg2)))
+ .body("result", Matchers.not(Matchers.equalTo(answer1)))
.body(Jt400Constants.KEY, Matchers.equalTo(key2));
}
@Test
- public void testMessageQueue() throws AS400SecurityException,
ObjectDoesNotExistException, IOException,
- InterruptedException, ErrorCompletingRequestException {
- String msg =
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT);
+ public void testMessageQueue() throws Exception {
+ LOGGER.debug("** testMessageQueue() ** has started ");
+ //write
+ String msg =
RandomStringUtils.randomAlphanumeric(MSG_LENGTH).toLowerCase(Locale.ROOT);
+ String answer = "Hello from MQ: " + msg;
RestAssured.given()
.body(msg)
.post("/jt400/messageQueue/write")
.then()
.statusCode(200)
- .body(Matchers.equalTo("Hello " + msg));
+ .body(Matchers.equalTo(answer));
+
+ LOGGER.debug("testMessageQueue: message '" + answer + "' was written.
");
+ //register to delete
+
getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.messageQueue,
answer);
+
+ //read (the read message might be different in case the test runs in
parallel
RestAssured.post("/jt400/messageQueue/read")
.then()
.statusCode(200)
- .body("result", Matchers.is("Hello " + msg))
//check of headers
.body(Jt400Constants.SENDER_INFORMATION,
Matchers.not(Matchers.empty()))
.body(Jt400Constants.MESSAGE_FILE, Matchers.is(""))
.body(Jt400Constants.MESSAGE_SEVERITY, Matchers.is(0))
.body(Jt400Constants.MESSAGE_ID, Matchers.is(""))
.body(Jt400Constants.MESSAGE_TYPE, Matchers.is(4))
- .body(Jt400Constants.MESSAGE, Matchers.is("QueuedMessage:
Hello " + msg));
+ .body(Jt400Constants.MESSAGE,
Matchers.startsWith("QueuedMessage: Hello "))
+ .body("result", Matchers.equalTo(answer));
//Jt400Constants.MESSAGE_DFT_RPY && Jt400Constants.MESSAGE_REPLYTO_KEY
are used only for a special
// type of message which can not be created by the camel component
(*INQUIRY)
}
@Test
- public void testInquiryMessageQueue() throws AS400SecurityException,
ObjectDoesNotExistException, IOException,
- InterruptedException, ErrorCompletingRequestException {
+ public void testInquiryMessageQueue() throws Exception {
+ LOGGER.debug("** testInquiryMessageQueue() **: has started ");
String msg =
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT);
+ String replyMsg = "reply to: " + msg;
+
+ LOGGER.debug("testInquiryMessageQueue: writing " + msg);
//sending a message using the same client as component
- RestAssured.given()
- .body(msg)
- .post("/jt400/client/inquiryMessage/write")
- .then()
- .statusCode(200);
+ getClientHelper().sendInquiry(msg);
+ //register deletion of the message in case some following task fails
+ QueuedMessage queuedMessage =
getClientHelper().peekReplyToQueueMessage(msg);
+ if (queuedMessage != null) {
+
getClientHelper().registerForRemoval(Jt400TestResource.RESOURCE_TYPE.replyToQueueu,
queuedMessage.getKey());
+ LOGGER.debug("testInquiryMessageQueue: message confirmed by peek:
" + msg);
+ }
+
+ //set filter for expected messages (for parallel executions)
RestAssured.given()
-
.body(ConfigProvider.getConfig().getValue("cq.jt400.message-replyto-queue",
String.class))
- .post("/jt400/client/queuedMessage/read")
+ .body(msg)
+ .post("/jt400/inquiryMessageSetExpected")
.then()
- .statusCode(200)
- .body(Matchers.equalTo("reply to: " + msg));
+ .statusCode(204);
+ //start route before sending message (and wait for start)
+ Awaitility.await().atMost(WAIT_IN_SECONDS, TimeUnit.SECONDS).until(
+ () -> RestAssured.get("/jt400/route/start/inquiryRoute")
+ .then()
+ .statusCode(200)
+ .extract().asString(),
+ Matchers.is(Boolean.TRUE.toString()));
+ LOGGER.debug("testInquiryMessageQueue: inquiry route started");
+
+ //await to be processed
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(20,
TimeUnit.SECONDS).until(
+ () -> RestAssured.get("/jt400/inquiryMessageProcessed")
+ .then()
+ .statusCode(200)
+ .extract().asString(),
+ Matchers.is(String.valueOf(Boolean.TRUE)));
+ LOGGER.debug("testInquiryMessageQueue: inquiry message processed");
+
+ //stop route (and wait for stop)
+ Awaitility.await().atMost(WAIT_IN_SECONDS, TimeUnit.SECONDS).until(
+ () -> RestAssured.get("/jt400/route/stop/inquiryRoute")
+ .then()
+ .statusCode(200)
+ .extract().asString(),
+ Matchers.is(Boolean.TRUE.toString()));
+ LOGGER.debug("testInquiryMessageQueue: inquiry route stooped");
+
+ //check written message with client
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(20,
TimeUnit.SECONDS).until(
+ () -> getClientHelper().peekReplyToQueueMessage(replyMsg),
+ Matchers.notNullValue());
+ LOGGER.debug("testInquiryMessageQueue: reply message confirmed by
peek: " + replyMsg);
}
@Test
@@ -219,27 +271,8 @@ public class Jt400Test {
.body(Matchers.containsString("hello camel"));
}
- private static void clearQueue(String queue, BiFunction<AS400, String,
Object> readFromQueue) {
- String jt400Url = ConfigProvider.getConfig().getValue("cq.jt400.url",
String.class);
- String jt400Username =
ConfigProvider.getConfig().getValue("cq.jt400.username", String.class);
- String jt400Password =
ConfigProvider.getConfig().getValue("cq.jt400.password", String.class);
- String jt400Library =
ConfigProvider.getConfig().getValue("cq.jt400.library", String.class);
- String jt400MessageQueue = ConfigProvider.getConfig().getValue(queue,
String.class);
-
- String objectPath = String.format("/QSYS.LIB/%s.LIB/%s", jt400Library,
jt400MessageQueue);
-
- AS400 as400 = new AS400(jt400Url, jt400Username, jt400Password);
-
- int i = 0;
- Object msg = null;
- //read messages until null is received
- do {
- msg = readFromQueue.apply(as400, objectPath);
- } while (i++ < 10 && msg != null);
-
- if (i == 10 && msg != null) {
- throw new IllegalStateException("There is a message present in a
queue!");
- }
+ private static Jt400ClientHelper getClientHelper() {
+ return Jt400TestResource.CLIENT_HELPER;
}
}
diff --git
a/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400TestResource.java
b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400TestResource.java
new file mode 100644
index 0000000000..3c09d0f562
--- /dev/null
+++
b/integration-tests/jt400/src/test/java/org/apache/camel/quarkus/component/jt400/it/Jt400TestResource.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.jt400.it;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.ibm.as400.access.AS400;
+import com.ibm.as400.access.AS400SecurityException;
+import com.ibm.as400.access.DataQueue;
+import com.ibm.as400.access.DataQueueEntry;
+import com.ibm.as400.access.ErrorCompletingRequestException;
+import com.ibm.as400.access.KeyedDataQueue;
+import com.ibm.as400.access.KeyedDataQueueEntry;
+import com.ibm.as400.access.MessageQueue;
+import com.ibm.as400.access.ObjectDoesNotExistException;
+import com.ibm.as400.access.QueuedMessage;
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.awaitility.Awaitility;
+import org.eclipse.microprofile.config.ConfigProvider;
+import org.hamcrest.Matchers;
+import org.jboss.logging.Logger;
+import org.junit.jupiter.api.Assertions;
+
+public class Jt400TestResource implements QuarkusTestResourceLifecycleManager {
+ private static final Logger LOGGER =
Logger.getLogger(Jt400TestResource.class);
+
+ public static enum RESOURCE_TYPE {
+ messageQueue,
+ keyedDataQue,
+ lifoQueueu,
+ replyToQueueu;
+ }
+
+ private static final Optional<String> JT400_CLEAR_ALL =
ConfigProvider.getConfig().getOptionalValue("cq.jt400.clear-all",
+ String.class);
+ private static final String JT400_URL =
ConfigProvider.getConfig().getValue("cq.jt400.url", String.class);
+ private static final String JT400_USERNAME =
ConfigProvider.getConfig().getValue("cq.jt400.username", String.class);
+ private static final String JT400_PASSWORD =
ConfigProvider.getConfig().getValue("cq.jt400.password", String.class);
+ private static final String JT400_LIBRARY =
ConfigProvider.getConfig().getValue("cq.jt400.library", String.class);
+ private static final String JT400_MESSAGE_QUEUE =
ConfigProvider.getConfig().getValue("cq.jt400.message-queue",
+ String.class);
+ private static final String JT400_REPLY_TO_MESSAGE_QUEUE =
ConfigProvider.getConfig().getValue(
+ "cq.jt400.message-replyto-queue",
+ String.class);
+ private static final String JT400_LIFO_QUEUE =
ConfigProvider.getConfig().getValue("cq.jt400.lifo-queue",
+ String.class);
+ private static final String JT400_KEYED_QUEUE =
ConfigProvider.getConfig().getValue("cq.jt400.keyed-queue", String.class);
+
+ //depth of repetitive reads for lifo queue clearing
+ private final static int CLEAR_DEPTH = 100;
+ public final static String LOCK_KEY = "cq.jt400.global-lock";
+ //5 minute timeout to obtain a log for the tests execution
+ private final static int LOCK_TIMEOUT = 300000;
+
+ private static AS400 as400 = new AS400(JT400_URL, JT400_USERNAME,
JT400_PASSWORD);;
+
+ @Override
+ public Map<String, String> start() {
+ //no need to start, as the instance already exists
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public void stop() {
+ if (as400 != null) {
+ try {
+ CLIENT_HELPER.clearAll(JT400_CLEAR_ALL.isPresent() &&
Boolean.parseBoolean(JT400_CLEAR_ALL.get()));
+ } catch (Exception e) {
+ LOGGER.debug("Clearing of the external queues failed", e);
+ }
+ as400.close();
+ }
+ }
+
+ private static String getObjectPath(String object) {
+ return String.format("/QSYS.LIB/%s.LIB/%s", JT400_LIBRARY, object);
+ }
+
+ public static Jt400ClientHelper CLIENT_HELPER = new Jt400ClientHelper() {
+
+ private String key = null;
+ Map<RESOURCE_TYPE, Set<Object>> toRemove = new HashMap<>();
+
+ @Override
+ public QueuedMessage peekReplyToQueueMessage(String msg) throws
Exception {
+ return getQueueMessage(JT400_REPLY_TO_MESSAGE_QUEUE, msg);
+ }
+
+ private QueuedMessage getQueueMessage(String queue, String msg) throws
Exception {
+ MessageQueue messageQueue = new MessageQueue(as400,
+ getObjectPath(queue));
+ Enumeration<QueuedMessage> msgs = messageQueue.getMessages();
+
+ while (msgs.hasMoreElements()) {
+ QueuedMessage queuedMessage = msgs.nextElement();
+
+ if (msg.equals(queuedMessage.getText())) {
+ return queuedMessage;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void registerForRemoval(RESOURCE_TYPE type, Object value) {
+ if (toRemove.containsKey(type)) {
+ toRemove.get(type).add(value);
+ } else {
+ Set<Object> set = new HashSet<>();
+ set.add(value);
+ toRemove.put(type, set);
+ }
+ }
+
+ @Override
+ public void clearAll(boolean all) throws Exception {
+ //message queue
+ MessageQueue mq = new MessageQueue(as400,
getObjectPath(JT400_MESSAGE_QUEUE));
+ if (all) {
+ mq.remove();
+ } else if (toRemove.containsKey(RESOURCE_TYPE.messageQueue)) {
+ clearMessageQueue(RESOURCE_TYPE.messageQueue, mq);
+ }
+
+ //lifo queue
+ DataQueue dq = new DataQueue(as400,
getObjectPath(JT400_LIFO_QUEUE));
+ if (all) {
+ for (int i = 01; i < CLEAR_DEPTH; i++) {
+ if (dq.read() == null) {
+ break;
+ }
+ }
+ } else if (toRemove.containsKey(RESOURCE_TYPE.lifoQueueu)) {
+ for (Object entry : toRemove.get(RESOURCE_TYPE.lifoQueueu)) {
+ List<byte[]> otherMessages = new LinkedList<>();
+ DataQueueEntry dqe = dq.read();
+ while (dqe != null && !(entry.equals(dqe.getString())
+ || entry.equals(new String(dqe.getData(),
StandardCharsets.UTF_8)))) {
+ otherMessages.add(dqe.getData());
+ dqe = dq.read();
+ }
+ //write back other messages in reverse order (it is a lifo)
+ Collections.reverse(otherMessages);
+ for (byte[] msg : otherMessages) {
+ dq.write(msg);
+ }
+ }
+ }
+ //reply-to queue
+ MessageQueue rq = new MessageQueue(as400,
getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE));
+ if (all) {
+ rq.remove();
+ } else if (toRemove.containsKey(RESOURCE_TYPE.replyToQueueu)) {
+ clearMessageQueue(RESOURCE_TYPE.replyToQueueu, rq);
+ }
+
+ //keyed queue
+ KeyedDataQueue kdq = new KeyedDataQueue(as400,
getObjectPath(JT400_KEYED_QUEUE));
+ if (all) {
+ kdq.clear();
+ } else if (toRemove.containsKey(RESOURCE_TYPE.keyedDataQue)) {
+ for (Object entry : toRemove.get(RESOURCE_TYPE.keyedDataQue)) {
+ kdq.clear((String) entry);
+ }
+ }
+ }
+
+ private void clearMessageQueue(RESOURCE_TYPE type, MessageQueue mq)
throws AS400SecurityException,
+ ErrorCompletingRequestException, InterruptedException,
IOException, ObjectDoesNotExistException {
+ if (!toRemove.get(type).isEmpty()) {
+ List<QueuedMessage> msgs = Collections.list(mq.getMessages());
+ Map<String, byte[]> keys =
msgs.stream().collect(Collectors.toMap(q -> q.getText(), q -> q.getKey()));
+ for (Object entry : toRemove.get(type)) {
+ if (entry instanceof String) {
+ mq.remove(keys.get((String) entry));
+ } else {
+ mq.remove((byte[]) entry);
+ }
+ }
+ }
+ }
+
+ /**
+ * Keyed dataque (FIFO) is used for locking purposes.
+ *
+ * - Each participant saves unique token into a key
cq.jt400.global-lock
+ * - Each participant the reads the FIFO queue and if the resulted
string is its own unique token, execution is allowed
+ * - When execution ends, the key is removed
+ *
+ * If the token is not its own
+ * -read of the token is repeated until timeout or its own token is
returned (so the second participant waits, until the
+ * first participant removes its token)
+ *
+ * Dead lock prevention
+ *
+ * - part of the unique token is timestamp, if participant finds a
token, which is too old, token is removed
+ * - action to clear-all data removes also the locking tokens
+ *
+ *
+ * Therefore only 1 token (thus 1 participant) is allowed to run the
tests, the others have to wait
+ *
+ * @throws Exception
+ */
+ @Override
+ public void lock() throws Exception {
+ if (key == null) {
+ key = generateKey();
+ //write key into keyed queue
+ KeyedDataQueue kdq = new KeyedDataQueue(as400,
getObjectPath(JT400_KEYED_QUEUE));
+
+ Assertions.assertTrue(kdq.isFIFO(), "keyed dataqueue has to be
FIFO");
+
+ kdq.write(LOCK_KEY, key);
+
+ //added 5 seconds for the timeout, to have some spare time for
removing old locks
+ Awaitility.await().pollInterval(1,
TimeUnit.SECONDS).atMost(LOCK_TIMEOUT + 5000, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ KeyedDataQueueEntry kdqe =
kdq.peek(LOCK_KEY);
+ if (kdqe == null) {
+ //if kdqe is null, try to lock again
+ LOGGER.debug("locked in the queueu was
removed, locking again with " + key);
+ kdq.write(LOCK_KEY, key);
+ }
+ String peekedKey = kdqe == null ? null :
kdqe.getString();
+ //if waiting takes more than 300s, check
whether the actual lock can be removed
+ LOGGER.debug("peeked lock " + peekedKey +
"(my lock is " + key + ")");
+
+ if (peekedKey != null &&
!key.equals(peekedKey)) {
+ long peekedTime =
Long.parseLong(peekedKey.substring(11));
+ if (System.currentTimeMillis() -
peekedTime > LOCK_TIMEOUT) {
+ //read the key (therefore remove
it)
+ String readKey =
kdq.read(LOCK_KEY).getString();
+ System.out.println("Removed old
lock " + readKey);
+ peekedKey =
kdq.peek(LOCK_KEY).getString();
+ }
+ }
+ return peekedKey;
+ },
+ Matchers.is(key));
+ }
+ }
+
+ @Override
+ public void unlock() throws Exception {
+ Assertions.assertEquals(key,
+ new KeyedDataQueue(as400,
getObjectPath(JT400_KEYED_QUEUE)).read(LOCK_KEY).getString());
+ //clear key
+ key = null;
+ }
+
+ private String generateKey() {
+ return
RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT) + ":" +
System.currentTimeMillis();
+ }
+
+ @Override
+ public String dumpQueues() throws Exception {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("\n* MESSAGE QUEUE\n");
+ sb.append("\t" + Collections.list(new MessageQueue(as400,
getObjectPath(JT400_MESSAGE_QUEUE)).getMessages())
+ .stream().map(mq ->
mq.getText()).sorted().collect(Collectors.joining(", ")));
+
+ sb.append("\n* INQUIRY QUEUE\n");
+ sb.append("\t" + Collections
+ .list(new MessageQueue(as400,
getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)).getMessages())
+ .stream().map(mq ->
mq.getText()).sorted().collect(Collectors.joining(", ")));
+
+ sb.append("\n* LIFO QUEUE\n");
+ DataQueue dq = new DataQueue(as400,
getObjectPath(JT400_LIFO_QUEUE));
+ DataQueueEntry dqe;
+ List<byte[]> lifoMessages = new LinkedList<>();
+ List<String> lifoTexts = new LinkedList<>();
+ do {
+ dqe = dq.read();
+ if (dqe != null) {
+ lifoTexts.add(dqe.getString() + " (" + new
String(dqe.getData(), StandardCharsets.UTF_8) + ")");
+ lifoMessages.add(dqe.getData());
+ }
+ } while (dqe != null);
+
+ //write back other messages in reverse order (it is a lifo)
+ Collections.reverse(lifoMessages);
+ for (byte[] msg : lifoMessages) {
+ dq.write(msg);
+ }
+ sb.append(lifoTexts.stream().collect(Collectors.joining(", ")));
+
+ sb.append("\n* KEYED DATA QUEUE\n");
+ KeyedDataQueue kdq = new KeyedDataQueue(as400,
getObjectPath(JT400_KEYED_QUEUE));
+ KeyedDataQueueEntry kdqe = kdq.peek(LOCK_KEY);
+ sb.append("\tlock: " + (kdqe == null ? "null" : kdqe.getString()));
+ return sb.toString();
+ }
+
+ public void sendInquiry(String msg) throws Exception {
+ new MessageQueue(as400,
getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE)).sendInquiry(msg,
+ getObjectPath(JT400_REPLY_TO_MESSAGE_QUEUE));
+ }
+ };
+
+}
+
+interface Jt400ClientHelper {
+
+ void registerForRemoval(Jt400TestResource.RESOURCE_TYPE type, Object
value);
+
+ QueuedMessage peekReplyToQueueMessage(String msg) throws Exception;
+
+ void sendInquiry(String msg) throws Exception;
+
+ //------------------- clear listeners ------------------------------
+
+ void clearAll(boolean all) throws Exception;
+
+ //----------------------- locking
+
+ void lock() throws Exception;
+
+ void unlock() throws Exception;
+
+ String dumpQueues() throws Exception;
+
+}