This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new d8db8d985086 CAMEL-23450: Remove unmaintained junit-toolbox from
camel-grpc and camel-thrift tests (#23648)
d8db8d985086 is described below
commit d8db8d985086720d2da1b19f5ec536a385eaf5a7
Author: Ravi <[email protected]>
AuthorDate: Sat May 30 11:49:07 2026 +0530
CAMEL-23450: Remove unmaintained junit-toolbox from camel-grpc and
camel-thrift tests (#23648)
Signed-off-by: Ravi <[email protected]>
---
components/camel-grpc/pom.xml | 12 --
.../component/grpc/GrpcConsumerConcurrentTest.java | 159 +++++++++++----------
components/camel-thrift/pom.xml | 12 --
.../thrift/ThriftConsumerConcurrentTest.java | 126 ++++++++--------
parent/pom.xml | 1 -
5 files changed, 153 insertions(+), 157 deletions(-)
diff --git a/components/camel-grpc/pom.xml b/components/camel-grpc/pom.xml
index 9d5677a065f0..9d0834ba099b 100644
--- a/components/camel-grpc/pom.xml
+++ b/components/camel-grpc/pom.xml
@@ -115,18 +115,6 @@
<version>${mockito-version}</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>com.googlecode.junit-toolbox</groupId>
- <artifactId>junit-toolbox</artifactId>
- <version>${junit-toolbox-version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<!-- GRPC does not support the jakarta annotation, see
https://github.com/grpc/grpc-java/issues/9179 -->
<dependency>
<groupId>javax.annotation</groupId>
diff --git
a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java
index dbac2d3a16c7..e4b7b8b7df29 100644
---
a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java
+++
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcConsumerConcurrentTest.java
@@ -16,13 +16,16 @@
*/
package org.apache.camel.component.grpc;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import com.googlecode.junittoolbox.MultithreadingTester;
-import com.googlecode.junittoolbox.RunnableAssert;
import io.grpc.ManagedChannel;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
@@ -56,87 +59,95 @@ public class GrpcConsumerConcurrentTest extends
GrpcTestSupport {
}
@Test
- public void testAsyncWithConcurrentThreads() {
+ public void testAsyncWithConcurrentThreads() throws Exception {
int asyncPort = getRoutePort("grpc-async");
- RunnableAssert ra = new RunnableAssert("foo") {
-
- @Override
- public void run() {
- final CountDownLatch latch = new CountDownLatch(1);
- ManagedChannel asyncRequestChannel
- = NettyChannelBuilder.forAddress("localhost",
asyncPort).usePlaintext()
- .build();
- PingPongGrpc.PingPongStub asyncNonBlockingStub =
PingPongGrpc.newStub(asyncRequestChannel);
-
- PongResponseStreamObserver responseObserver = new
PongResponseStreamObserver(latch);
- int instanceId = createId();
-
- final PingRequest pingRequest
- =
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build();
- StreamObserver<PingRequest> requestObserver =
asyncNonBlockingStub.pingAsyncAsync(responseObserver);
- requestObserver.onNext(pingRequest);
- requestObserver.onNext(pingRequest);
- requestObserver.onCompleted();
- try {
- assertTrue(latch.await(5, TimeUnit.SECONDS));
- } catch (InterruptedException e) {
- LOG.debug("Unhandled exception (probably safe to ignore):
{}", e.getMessage(), e);
- }
-
- PongResponse pongResponse = responseObserver.getPongResponse();
-
- assertNotNull(pongResponse, "instanceId = " + instanceId);
- assertEquals(instanceId, pongResponse.getPongId());
- assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE,
pongResponse.getPongName());
-
- asyncRequestChannel.shutdown().shutdownNow();
+ runConcurrent(() -> {
+ final CountDownLatch latch = new CountDownLatch(1);
+ ManagedChannel asyncRequestChannel
+ = NettyChannelBuilder.forAddress("localhost",
asyncPort).usePlaintext()
+ .build();
+ PingPongGrpc.PingPongStub asyncNonBlockingStub =
PingPongGrpc.newStub(asyncRequestChannel);
+
+ PongResponseStreamObserver responseObserver = new
PongResponseStreamObserver(latch);
+ int instanceId = createId();
+
+ final PingRequest pingRequest
+ =
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build();
+ StreamObserver<PingRequest> requestObserver =
asyncNonBlockingStub.pingAsyncAsync(responseObserver);
+ requestObserver.onNext(pingRequest);
+ requestObserver.onNext(pingRequest);
+ requestObserver.onCompleted();
+ try {
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ LOG.debug("Unhandled exception (probably safe to ignore): {}",
e.getMessage(), e);
}
- };
- new
MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT)
- .run();
+ PongResponse pongResponse = responseObserver.getPongResponse();
+
+ assertNotNull(pongResponse, "instanceId = " + instanceId);
+ assertEquals(instanceId, pongResponse.getPongId());
+ assertEquals(GRPC_TEST_PING_VALUE + GRPC_TEST_PONG_VALUE,
pongResponse.getPongName());
+
+ asyncRequestChannel.shutdown().shutdownNow();
+ return null;
+ });
}
@Test
- public void testHeadersWithConcurrentThreads() {
+ public void testHeadersWithConcurrentThreads() throws Exception {
int headersPort = getRoutePort("grpc-headers");
- RunnableAssert ra = new RunnableAssert("foo") {
-
- @Override
- public void run() {
- int instanceId = createId();
- final CountDownLatch latch = new CountDownLatch(1);
- ManagedChannel asyncRequestChannel =
NettyChannelBuilder.forAddress("localhost", headersPort)
- .userAgent(GRPC_USER_AGENT_PREFIX + instanceId)
- .usePlaintext().build();
- PingPongGrpc.PingPongStub asyncNonBlockingStub =
PingPongGrpc.newStub(asyncRequestChannel);
-
- PongResponseStreamObserver responseObserver = new
PongResponseStreamObserver(latch);
-
- final PingRequest pingRequest
- =
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build();
- StreamObserver<PingRequest> requestObserver =
asyncNonBlockingStub.pingAsyncAsync(responseObserver);
- requestObserver.onNext(pingRequest);
- requestObserver.onNext(pingRequest);
- requestObserver.onCompleted();
- try {
- assertTrue(latch.await(5, TimeUnit.SECONDS));
- } catch (InterruptedException e) {
- LOG.debug("Interrupted while waiting for the response", e);
- }
-
- PongResponse pongResponse = responseObserver.getPongResponse();
-
- assertNotNull(pongResponse, "instanceId = " + instanceId);
- assertEquals(instanceId, pongResponse.getPongId());
- assertEquals(GRPC_USER_AGENT_PREFIX + instanceId,
pongResponse.getPongName());
-
- asyncRequestChannel.shutdown().shutdownNow();
+ runConcurrent(() -> {
+ int instanceId = createId();
+ final CountDownLatch latch = new CountDownLatch(1);
+ ManagedChannel asyncRequestChannel =
NettyChannelBuilder.forAddress("localhost", headersPort)
+ .userAgent(GRPC_USER_AGENT_PREFIX + instanceId)
+ .usePlaintext().build();
+ PingPongGrpc.PingPongStub asyncNonBlockingStub =
PingPongGrpc.newStub(asyncRequestChannel);
+
+ PongResponseStreamObserver responseObserver = new
PongResponseStreamObserver(latch);
+
+ final PingRequest pingRequest
+ =
PingRequest.newBuilder().setPingName(GRPC_TEST_PING_VALUE).setPingId(instanceId).build();
+ StreamObserver<PingRequest> requestObserver =
asyncNonBlockingStub.pingAsyncAsync(responseObserver);
+ requestObserver.onNext(pingRequest);
+ requestObserver.onNext(pingRequest);
+ requestObserver.onCompleted();
+ try {
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted while waiting for the response", e);
}
- };
- new
MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT)
- .run();
+ PongResponse pongResponse = responseObserver.getPongResponse();
+
+ assertNotNull(pongResponse, "instanceId = " + instanceId);
+ assertEquals(instanceId, pongResponse.getPongId());
+ assertEquals(GRPC_USER_AGENT_PREFIX + instanceId,
pongResponse.getPongName());
+
+ asyncRequestChannel.shutdown().shutdownNow();
+ return null;
+ });
+ }
+
+ private void runConcurrent(Callable<?> task) throws Exception {
+ ExecutorService executor =
Executors.newFixedThreadPool(CONCURRENT_THREAD_COUNT);
+ try {
+ List<Future<?>> futures = new ArrayList<>();
+ for (int thread = 0; thread < CONCURRENT_THREAD_COUNT; thread++) {
+ futures.add(executor.submit(() -> {
+ for (int round = 0; round < ROUNDS_PER_THREAD_COUNT;
round++) {
+ task.call();
+ }
+ return null;
+ }));
+ }
+ for (Future<?> future : futures) {
+ future.get(1, TimeUnit.MINUTES);
+ }
+ } finally {
+ executor.shutdownNow();
+ }
}
@Override
diff --git a/components/camel-thrift/pom.xml b/components/camel-thrift/pom.xml
index c9d3c38eddea..5aa3b78168d3 100644
--- a/components/camel-thrift/pom.xml
+++ b/components/camel-thrift/pom.xml
@@ -75,18 +75,6 @@
<artifactId>gson</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>com.googlecode.junit-toolbox</groupId>
- <artifactId>junit-toolbox</artifactId>
- <version>${junit-toolbox-version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
</dependencies>
<build>
diff --git
a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerConcurrentTest.java
b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerConcurrentTest.java
index b743ded59022..63f071c893fe 100644
---
a/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerConcurrentTest.java
+++
b/components/camel-thrift/src/test/java/org/apache/camel/component/thrift/ThriftConsumerConcurrentTest.java
@@ -16,13 +16,16 @@
*/
package org.apache.camel.component.thrift;
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import com.googlecode.junittoolbox.MultithreadingTester;
-import com.googlecode.junittoolbox.RunnableAssert;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.thrift.generated.Calculator;
import org.apache.camel.component.thrift.generated.Operation;
@@ -37,7 +40,6 @@ import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.layered.TFramedTransport;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
@@ -68,68 +70,76 @@ public class ThriftConsumerConcurrentTest extends
CamelTestSupport {
}
@Test
- public void testSyncWithConcurrentThreads() {
- RunnableAssert ra = new
RunnableAssert("testSyncWithConcurrentThreads") {
-
- @Override
- public void run() throws TTransportException {
- TTransport transport = new TSocket("localhost",
getPortForRoute(0));
- transport.open();
- TProtocol protocol = new TBinaryProtocol(new
TFramedTransport(transport));
- Calculator.Client client = (new
Calculator.Client.Factory()).getClient(protocol);
-
- int instanceId = createId();
+ public void testSyncWithConcurrentThreads() throws Exception {
+ runConcurrent(() -> {
+ TTransport transport = new TSocket("localhost",
getPortForRoute(0));
+ transport.open();
+ TProtocol protocol = new TBinaryProtocol(new
TFramedTransport(transport));
+ Calculator.Client client = (new
Calculator.Client.Factory()).getClient(protocol);
+
+ int instanceId = createId();
+
+ int calculateResponse = 0;
+ try {
+ calculateResponse = client.calculate(1, new Work(instanceId,
THRIFT_TEST_NUM1, Operation.MULTIPLY));
+ } catch (TException e) {
+ LOG.info("Exception", e);
+ }
- int calculateResponse = 0;
- try {
- calculateResponse = client.calculate(1, new
Work(instanceId, THRIFT_TEST_NUM1, Operation.MULTIPLY));
- } catch (TException e) {
- LOG.info("Exception", e);
- }
+ assertNotEquals(0, calculateResponse, "instanceId = " +
instanceId);
+ assertEquals(instanceId * THRIFT_TEST_NUM1, calculateResponse);
- assertNotEquals(0, calculateResponse, "instanceId = " +
instanceId);
- assertEquals(instanceId * THRIFT_TEST_NUM1, calculateResponse);
+ transport.close();
+ return null;
+ });
+ }
- transport.close();
+ @Test
+ public void testAsyncWithConcurrentThreads() throws Exception {
+ runConcurrent(() -> {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ TNonblockingTransport transport = new
TNonblockingSocket("localhost", getPortForRoute(1));
+ Calculator.AsyncClient client
+ = (new Calculator.AsyncClient.Factory(new
TAsyncClientManager(), new TBinaryProtocol.Factory()))
+ .getAsyncClient(transport);
+
+ int instanceId = createId();
+ CalculateAsyncMethodCallback calculateCallback = new
CalculateAsyncMethodCallback(latch);
+ try {
+ client.calculate(1, new Work(instanceId, THRIFT_TEST_NUM1,
Operation.MULTIPLY), calculateCallback);
+ } catch (TException e) {
+ LOG.info("Exception", e);
}
- };
+ latch.await(5, TimeUnit.SECONDS);
- new
MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT)
- .run();
- }
+ int calculateResponse = calculateCallback.getCalculateResponse();
+ LOG.debug("instanceId = {}", instanceId);
+ assertEquals(instanceId * THRIFT_TEST_NUM1, calculateResponse);
- @Test
- public void testAsyncWithConcurrentThreads() {
- RunnableAssert ra = new
RunnableAssert("testAsyncWithConcurrentThreads") {
+ transport.close();
+ return null;
+ });
+ }
- @Override
- public void run() throws TTransportException, IOException,
InterruptedException {
- final CountDownLatch latch = new CountDownLatch(1);
-
- TNonblockingTransport transport = new
TNonblockingSocket("localhost", getPortForRoute(1));
- Calculator.AsyncClient client
- = (new Calculator.AsyncClient.Factory(new
TAsyncClientManager(), new TBinaryProtocol.Factory()))
- .getAsyncClient(transport);
-
- int instanceId = createId();
- CalculateAsyncMethodCallback calculateCallback = new
CalculateAsyncMethodCallback(latch);
- try {
- client.calculate(1, new Work(instanceId, THRIFT_TEST_NUM1,
Operation.MULTIPLY), calculateCallback);
- } catch (TException e) {
- LOG.info("Exception", e);
- }
- latch.await(5, TimeUnit.SECONDS);
-
- int calculateResponse =
calculateCallback.getCalculateResponse();
- LOG.debug("instanceId = {}", instanceId);
- assertEquals(instanceId * THRIFT_TEST_NUM1, calculateResponse);
-
- transport.close();
+ private void runConcurrent(Callable<?> task) throws Exception {
+ ExecutorService executor =
Executors.newFixedThreadPool(CONCURRENT_THREAD_COUNT);
+ try {
+ List<Future<?>> futures = new ArrayList<>();
+ for (int thread = 0; thread < CONCURRENT_THREAD_COUNT; thread++) {
+ futures.add(executor.submit(() -> {
+ for (int round = 0; round < ROUNDS_PER_THREAD_COUNT;
round++) {
+ task.call();
+ }
+ return null;
+ }));
}
- };
-
- new
MultithreadingTester().add(ra).numThreads(CONCURRENT_THREAD_COUNT).numRoundsPerThread(ROUNDS_PER_THREAD_COUNT)
- .run();
+ for (Future<?> future : futures) {
+ future.get(1, TimeUnit.MINUTES);
+ }
+ } finally {
+ executor.shutdownNow();
+ }
}
public class CalculateAsyncMethodCallback implements
AsyncMethodCallback<Integer> {
diff --git a/parent/pom.xml b/parent/pom.xml
index f271454e3e6b..a7559b181cae 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -325,7 +325,6 @@
<nullaway-version>0.13.4</nullaway-version>
<jt400-version>21.0.6</jt400-version>
<jte-version>3.2.4</jte-version>
- <junit-toolbox-version>2.4</junit-toolbox-version>
<junit-jupiter-version>5.13.4</junit-jupiter-version>
<junit6-jupiter-version>6.1.0</junit6-jupiter-version>
<junit-pioneer-version>2.3.0</junit-pioneer-version>