This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push:
new 8d313225a5 [fixes #3037] Use preferred way for handling gRPC onError
8d313225a5 is described below
commit 8d313225a5a914dc305f358a2f0cead28ca97256
Author: Lukas Lowinger <[email protected]>
AuthorDate: Fri Nov 14 16:31:42 2025 +0100
[fixes #3037] Use preferred way for handling gRPC onError
---
integration-tests/grpc/README.adoc | 34 ++++++++++
.../quarkus/component/grpc/it/PingPongImpl.java | 74 ----------------------
.../camel/quarkus/component/grpc/it/GrpcTest.java | 29 +++++----
3 files changed, 50 insertions(+), 87 deletions(-)
diff --git a/integration-tests/grpc/README.adoc
b/integration-tests/grpc/README.adoc
new file mode 100644
index 0000000000..1bee1528b1
--- /dev/null
+++ b/integration-tests/grpc/README.adoc
@@ -0,0 +1,34 @@
+== gRPC integration tests
+
+To generate implementation of GRPC stub (for eg. more detailed understanding
of how it actually works under the hood), you can use `mvn clean install
-DskipTests -Dquarkus.package.jar.decompiler.enabled=true` in this module.
+Then you can note
`target/decompiler/generated-bytecode/org/apache/camel/quarkus/component/grpc/it/model/PingPongGrpc\$PingPongImplBaseQuarkusMethodHandler.java`
which contains something like:
+
+[source,java]
+----
+@Dependent
+public class PingPongGrpc$PingPongImplBaseQuarkusMethodHandler extends
PingPongImplBase implements CamelQuarkusBindableService {
+ private GrpcMethodHandler methodHandler;
+
+ public void setMethodHandler(GrpcMethodHandler var1) {
+ this.methodHandler = var1;
+ }
+
+ public StreamObserver pingAsyncAsync(StreamObserver var1) {
+ return this.methodHandler.handleForConsumerStrategy(var1,
"pingAsyncAsync");
+ }
+
+ public StreamObserver pingAsyncSync(StreamObserver var1) {
+ return this.methodHandler.handleForConsumerStrategy(var1,
"pingAsyncSync");
+ }
+
+ public void pingSyncAsync(PingRequest var1, StreamObserver var2) {
+ this.methodHandler.handle(var1, var2, "pingSyncAsync");
+ }
+
+ public void pingSyncSync(PingRequest var1, StreamObserver var2) {
+ this.methodHandler.handle(var1, var2, "pingSyncSync");
+ }
+}
+----
+
+where the `GrpcMethodHandler` is the bridge from gRPC into the Camel routing
engine.
diff --git
a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/PingPongImpl.java
b/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/PingPongImpl.java
deleted file mode 100644
index 40fb93b61c..0000000000
---
a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/PingPongImpl.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.quarkus.component.grpc.it;
-
-import io.grpc.stub.StreamObserver;
-import org.apache.camel.quarkus.component.grpc.it.model.PingRequest;
-import org.apache.camel.quarkus.component.grpc.it.model.PongResponse;
-import org.jboss.logging.Logger;
-
-public class PingPongImpl extends
org.apache.camel.quarkus.component.grpc.it.model.PingPongGrpc.PingPongImplBase {
- private static final Logger LOG = Logger.getLogger(PingPongImpl.class);
- static final String GRPC_TEST_PONG_VALUE = "PONG";
-
- @Override
- public void pingSyncSync(PingRequest request, StreamObserver<PongResponse>
responseObserver) {
- LOG.infof("gRPC server pingSyncSync received data from PingPong
service PingId=%s PingName=%s", request.getPingId(),
- request.getPingName());
- PongResponse response =
PongResponse.newBuilder().setPongName(request.getPingName() +
GRPC_TEST_PONG_VALUE)
- .setPongId(request.getPingId()).build();
-
- responseObserver.onNext(response);
- responseObserver.onCompleted();
- }
-
- @Override
- public void pingSyncAsync(PingRequest request,
StreamObserver<PongResponse> responseObserver) {
- LOG.infof("gRPC server pingSyncAsync received data from PingPong
service PingId=%s PingName=%s", request.getPingId(),
- request.getPingName());
- PongResponse response =
PongResponse.newBuilder().setPongName(request.getPingName() +
GRPC_TEST_PONG_VALUE)
- .setPongId(request.getPingId()).build();
-
- responseObserver.onNext(response);
- responseObserver.onCompleted();
- }
-
- @Override
- public StreamObserver<PingRequest>
pingAsyncAsync(StreamObserver<PongResponse> responseObserver) {
- return new StreamObserver<>() {
- @Override
- public void onNext(PingRequest request) {
- LOG.infof("gRPC server pingAsyncAsync received data from
PingPong service PingId=%s PingName=%s",
- request.getPingId(),
- request.getPingName());
- PongResponse response =
PongResponse.newBuilder().setPongName(request.getPingName() +
GRPC_TEST_PONG_VALUE)
- .setPongId(request.getPingId()).build();
- responseObserver.onNext(response);
- }
-
- @Override
- public void onError(Throwable throwable) {
- responseObserver.onError(throwable);
- }
-
- @Override
- public void onCompleted() {
- responseObserver.onCompleted();
- }
- };
- }
-}
diff --git
a/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java
b/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java
index acfd42b6d7..961807b4cc 100644
---
a/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java
+++
b/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java
@@ -63,7 +63,6 @@ import static
org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_E
import static
org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT;
import static
org.apache.camel.component.grpc.GrpcConstants.GRPC_METHOD_NAME_HEADER;
import static
org.apache.camel.quarkus.component.grpc.it.GrpcRoute.GRPC_JWT_SECRET;
-import static
org.apache.camel.quarkus.component.grpc.it.PingPongImpl.GRPC_TEST_PONG_VALUE;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -81,6 +80,7 @@ import static org.junit.jupiter.api.Assertions.fail;
class GrpcTest {
private static final String GRPC_TEST_PING_VALUE = "PING";
+ private static final String GRPC_TEST_PONG_VALUE = "PONG";
private static final int GRPC_TEST_PING_ID = 1234;
private static final Logger LOG = Logger.getLogger(GrpcTest.class);
@@ -198,15 +198,27 @@ class GrpcTest {
Config config = ConfigProvider.getConfig();
Integer port =
config.getValue("camel.grpc.test.forward.error.server.port", Integer.class);
CountDownLatch latch = new CountDownLatch(1);
+ PingRequest pingRequest = PingRequest.newBuilder()
+ .setPingName(GRPC_TEST_PING_VALUE)
+ .setPingId(GRPC_TEST_PING_ID)
+ .build();
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost",
port).usePlaintext().build();
try {
PingPongStub pingPongStub = PingPongGrpc.newStub(channel);
LOG.info("forwardOnError: preparing observers");
- PongResponseStreamObserver responseObserver = new
PongResponseStreamObserver(latch, true);
+ PongResponseStreamObserver responseObserver = new
PongResponseStreamObserver(latch);
StreamObserver<PingRequest> requestObserver =
pingPongStub.pingAsyncAsync(responseObserver);
- LOG.info("forwardOnError: calling onNext(null)");
- requestObserver.onNext(null);
+ // GRPC uses DelayedClientCall, which queues requests before real
call is ready.
+ // Thus we first need to establish real call with calling eg.
`onNext` and wait for its response.
+ requestObserver.onNext(pingRequest);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() ->
responseObserver.getPongResponse() != null);
+ // Then we can finally mimic failure by calling `onError`.
+ // If we wouldn't establish the real call first, the
DelayedClientCall would just call `onError` on ResponseObserver immediately
without propagating it to real call
+ // (which we need for testing forwardOnError option on camel
server side).
+ // More details in:
https://github.com/grpc/grpc-java/blob/v1.76.0/core/src/main/java/io/grpc/internal/DelayedClientCall.java#L256
+ // Also note this comment
https://github.com/apache/camel-quarkus/issues/7897#issuecomment-3480980023
+ requestObserver.onError(new IllegalStateException("Forced
exception"));
LOG.info("forwardOnError: waiting latch.await(5s)");
assertTrue(latch.await(5, TimeUnit.SECONDS));
@@ -657,17 +669,11 @@ class GrpcTest {
static final class PongResponseStreamObserver implements
StreamObserver<PongResponse> {
private final CountDownLatch latch;
- private final boolean simulateError;
private PongResponse pongResponse;
private volatile Throwable errorResponse;
public PongResponseStreamObserver(CountDownLatch latch) {
- this(latch, false);
- }
-
- public PongResponseStreamObserver(CountDownLatch latch, boolean
simulateError) {
this.latch = latch;
- this.simulateError = simulateError;
}
public PongResponse getPongResponse() {
@@ -682,9 +688,6 @@ class GrpcTest {
public void onNext(PongResponse value) {
LOG.infof("PongResponseStreamObserver#onNext:%s", value);
pongResponse = value;
- if (simulateError) {
- throw new IllegalStateException("Forced exception");
- }
}
@Override