This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch 3.27.x in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
commit 8d9dbfa541330239eed90b01bb74114ca3a0318d Author: Lukas Lowinger <[email protected]> AuthorDate: Fri Nov 14 16:31:42 2025 +0100 [fixes #3037] Use preferred way for handling gRPC onError --- integration-tests/grpc/README.adoc | 34 ++++++++++ .../quarkus/component/grpc/it/PingPongImpl.java | 74 ---------------------- .../camel/quarkus/component/grpc/it/GrpcTest.java | 29 +++++---- 3 files changed, 50 insertions(+), 87 deletions(-) diff --git a/integration-tests/grpc/README.adoc b/integration-tests/grpc/README.adoc new file mode 100644 index 0000000000..1bee1528b1 --- /dev/null +++ b/integration-tests/grpc/README.adoc @@ -0,0 +1,34 @@ +== gRPC integration tests + +To generate implementation of GRPC stub (for eg. more detailed understanding of how it actually works under the hood), you can use `mvn clean install -DskipTests -Dquarkus.package.jar.decompiler.enabled=true` in this module. +Then you can note `target/decompiler/generated-bytecode/org/apache/camel/quarkus/component/grpc/it/model/PingPongGrpc\$PingPongImplBaseQuarkusMethodHandler.java` which contains something like: + +[source,java] +---- +@Dependent +public class PingPongGrpc$PingPongImplBaseQuarkusMethodHandler extends PingPongImplBase implements CamelQuarkusBindableService { + private GrpcMethodHandler methodHandler; + + public void setMethodHandler(GrpcMethodHandler var1) { + this.methodHandler = var1; + } + + public StreamObserver pingAsyncAsync(StreamObserver var1) { + return this.methodHandler.handleForConsumerStrategy(var1, "pingAsyncAsync"); + } + + public StreamObserver pingAsyncSync(StreamObserver var1) { + return this.methodHandler.handleForConsumerStrategy(var1, "pingAsyncSync"); + } + + public void pingSyncAsync(PingRequest var1, StreamObserver var2) { + this.methodHandler.handle(var1, var2, "pingSyncAsync"); + } + + public void pingSyncSync(PingRequest var1, StreamObserver var2) { + this.methodHandler.handle(var1, var2, "pingSyncSync"); + } +} +---- + +where the `GrpcMethodHandler` is the bridge from gRPC into the Camel routing engine. diff --git a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/PingPongImpl.java b/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/PingPongImpl.java deleted file mode 100644 index 40fb93b61c..0000000000 --- a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/PingPongImpl.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.quarkus.component.grpc.it; - -import io.grpc.stub.StreamObserver; -import org.apache.camel.quarkus.component.grpc.it.model.PingRequest; -import org.apache.camel.quarkus.component.grpc.it.model.PongResponse; -import org.jboss.logging.Logger; - -public class PingPongImpl extends org.apache.camel.quarkus.component.grpc.it.model.PingPongGrpc.PingPongImplBase { - private static final Logger LOG = Logger.getLogger(PingPongImpl.class); - static final String GRPC_TEST_PONG_VALUE = "PONG"; - - @Override - public void pingSyncSync(PingRequest request, StreamObserver<PongResponse> responseObserver) { - LOG.infof("gRPC server pingSyncSync received data from PingPong service PingId=%s PingName=%s", request.getPingId(), - request.getPingName()); - PongResponse response = PongResponse.newBuilder().setPongName(request.getPingName() + GRPC_TEST_PONG_VALUE) - .setPongId(request.getPingId()).build(); - - responseObserver.onNext(response); - responseObserver.onCompleted(); - } - - @Override - public void pingSyncAsync(PingRequest request, StreamObserver<PongResponse> responseObserver) { - LOG.infof("gRPC server pingSyncAsync received data from PingPong service PingId=%s PingName=%s", request.getPingId(), - request.getPingName()); - PongResponse response = PongResponse.newBuilder().setPongName(request.getPingName() + GRPC_TEST_PONG_VALUE) - .setPongId(request.getPingId()).build(); - - responseObserver.onNext(response); - responseObserver.onCompleted(); - } - - @Override - public StreamObserver<PingRequest> pingAsyncAsync(StreamObserver<PongResponse> responseObserver) { - return new StreamObserver<>() { - @Override - public void onNext(PingRequest request) { - LOG.infof("gRPC server pingAsyncAsync received data from PingPong service PingId=%s PingName=%s", - request.getPingId(), - request.getPingName()); - PongResponse response = PongResponse.newBuilder().setPongName(request.getPingName() + GRPC_TEST_PONG_VALUE) - .setPongId(request.getPingId()).build(); - responseObserver.onNext(response); - } - - @Override - public void onError(Throwable throwable) { - responseObserver.onError(throwable); - } - - @Override - public void onCompleted() { - responseObserver.onCompleted(); - } - }; - } -} diff --git a/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java b/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java index acfd42b6d7..961807b4cc 100644 --- a/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java +++ b/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java @@ -63,7 +63,6 @@ import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_E import static org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT; import static org.apache.camel.component.grpc.GrpcConstants.GRPC_METHOD_NAME_HEADER; import static org.apache.camel.quarkus.component.grpc.it.GrpcRoute.GRPC_JWT_SECRET; -import static org.apache.camel.quarkus.component.grpc.it.PingPongImpl.GRPC_TEST_PONG_VALUE; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -81,6 +80,7 @@ import static org.junit.jupiter.api.Assertions.fail; class GrpcTest { private static final String GRPC_TEST_PING_VALUE = "PING"; + private static final String GRPC_TEST_PONG_VALUE = "PONG"; private static final int GRPC_TEST_PING_ID = 1234; private static final Logger LOG = Logger.getLogger(GrpcTest.class); @@ -198,15 +198,27 @@ class GrpcTest { Config config = ConfigProvider.getConfig(); Integer port = config.getValue("camel.grpc.test.forward.error.server.port", Integer.class); CountDownLatch latch = new CountDownLatch(1); + PingRequest pingRequest = PingRequest.newBuilder() + .setPingName(GRPC_TEST_PING_VALUE) + .setPingId(GRPC_TEST_PING_ID) + .build(); ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build(); try { PingPongStub pingPongStub = PingPongGrpc.newStub(channel); LOG.info("forwardOnError: preparing observers"); - PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch, true); + PongResponseStreamObserver responseObserver = new PongResponseStreamObserver(latch); StreamObserver<PingRequest> requestObserver = pingPongStub.pingAsyncAsync(responseObserver); - LOG.info("forwardOnError: calling onNext(null)"); - requestObserver.onNext(null); + // GRPC uses DelayedClientCall, which queues requests before real call is ready. + // Thus we first need to establish real call with calling eg. `onNext` and wait for its response. + requestObserver.onNext(pingRequest); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> responseObserver.getPongResponse() != null); + // Then we can finally mimic failure by calling `onError`. + // If we wouldn't establish the real call first, the DelayedClientCall would just call `onError` on ResponseObserver immediately without propagating it to real call + // (which we need for testing forwardOnError option on camel server side). + // More details in: https://github.com/grpc/grpc-java/blob/v1.76.0/core/src/main/java/io/grpc/internal/DelayedClientCall.java#L256 + // Also note this comment https://github.com/apache/camel-quarkus/issues/7897#issuecomment-3480980023 + requestObserver.onError(new IllegalStateException("Forced exception")); LOG.info("forwardOnError: waiting latch.await(5s)"); assertTrue(latch.await(5, TimeUnit.SECONDS)); @@ -657,17 +669,11 @@ class GrpcTest { static final class PongResponseStreamObserver implements StreamObserver<PongResponse> { private final CountDownLatch latch; - private final boolean simulateError; private PongResponse pongResponse; private volatile Throwable errorResponse; public PongResponseStreamObserver(CountDownLatch latch) { - this(latch, false); - } - - public PongResponseStreamObserver(CountDownLatch latch, boolean simulateError) { this.latch = latch; - this.simulateError = simulateError; } public PongResponse getPongResponse() { @@ -682,9 +688,6 @@ class GrpcTest { public void onNext(PongResponse value) { LOG.infof("PongResponseStreamObserver#onNext:%s", value); pongResponse = value; - if (simulateError) { - throw new IllegalStateException("Forced exception"); - } } @Override
