This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d313225a5 [fixes #3037] Use preferred way for handling gRPC onError
8d313225a5 is described below

commit 8d313225a5a914dc305f358a2f0cead28ca97256
Author: Lukas Lowinger <[email protected]>
AuthorDate: Fri Nov 14 16:31:42 2025 +0100

    [fixes #3037] Use preferred way for handling gRPC onError
---
 integration-tests/grpc/README.adoc                 | 34 ++++++++++
 .../quarkus/component/grpc/it/PingPongImpl.java    | 74 ----------------------
 .../camel/quarkus/component/grpc/it/GrpcTest.java  | 29 +++++----
 3 files changed, 50 insertions(+), 87 deletions(-)

diff --git a/integration-tests/grpc/README.adoc 
b/integration-tests/grpc/README.adoc
new file mode 100644
index 0000000000..1bee1528b1
--- /dev/null
+++ b/integration-tests/grpc/README.adoc
@@ -0,0 +1,34 @@
+== gRPC integration tests
+
+To generate implementation of GRPC stub (for eg. more detailed understanding 
of how it actually works under the hood), you can use `mvn clean install 
-DskipTests -Dquarkus.package.jar.decompiler.enabled=true` in this module.
+Then you can note 
`target/decompiler/generated-bytecode/org/apache/camel/quarkus/component/grpc/it/model/PingPongGrpc\$PingPongImplBaseQuarkusMethodHandler.java`
 which contains something like:
+
+[source,java]
+----
+@Dependent
+public class PingPongGrpc$PingPongImplBaseQuarkusMethodHandler extends 
PingPongImplBase implements CamelQuarkusBindableService {
+   private GrpcMethodHandler methodHandler;
+
+   public void setMethodHandler(GrpcMethodHandler var1) {
+      this.methodHandler = var1;
+   }
+
+   public StreamObserver pingAsyncAsync(StreamObserver var1) {
+      return this.methodHandler.handleForConsumerStrategy(var1, 
"pingAsyncAsync");
+   }
+
+   public StreamObserver pingAsyncSync(StreamObserver var1) {
+      return this.methodHandler.handleForConsumerStrategy(var1, 
"pingAsyncSync");
+   }
+
+   public void pingSyncAsync(PingRequest var1, StreamObserver var2) {
+      this.methodHandler.handle(var1, var2, "pingSyncAsync");
+   }
+
+   public void pingSyncSync(PingRequest var1, StreamObserver var2) {
+      this.methodHandler.handle(var1, var2, "pingSyncSync");
+   }
+}
+----
+
+where the `GrpcMethodHandler` is the bridge from gRPC into the Camel routing 
engine.
diff --git 
a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/PingPongImpl.java
 
b/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/PingPongImpl.java
deleted file mode 100644
index 40fb93b61c..0000000000
--- 
a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/PingPongImpl.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.quarkus.component.grpc.it;
-
-import io.grpc.stub.StreamObserver;
-import org.apache.camel.quarkus.component.grpc.it.model.PingRequest;
-import org.apache.camel.quarkus.component.grpc.it.model.PongResponse;
-import org.jboss.logging.Logger;
-
-public class PingPongImpl extends 
org.apache.camel.quarkus.component.grpc.it.model.PingPongGrpc.PingPongImplBase {
-    private static final Logger LOG = Logger.getLogger(PingPongImpl.class);
-    static final String GRPC_TEST_PONG_VALUE = "PONG";
-
-    @Override
-    public void pingSyncSync(PingRequest request, StreamObserver<PongResponse> 
responseObserver) {
-        LOG.infof("gRPC server pingSyncSync received data from PingPong 
service PingId=%s PingName=%s", request.getPingId(),
-                request.getPingName());
-        PongResponse response = 
PongResponse.newBuilder().setPongName(request.getPingName() + 
GRPC_TEST_PONG_VALUE)
-                .setPongId(request.getPingId()).build();
-
-        responseObserver.onNext(response);
-        responseObserver.onCompleted();
-    }
-
-    @Override
-    public void pingSyncAsync(PingRequest request, 
StreamObserver<PongResponse> responseObserver) {
-        LOG.infof("gRPC server pingSyncAsync received data from PingPong 
service PingId=%s PingName=%s", request.getPingId(),
-                request.getPingName());
-        PongResponse response = 
PongResponse.newBuilder().setPongName(request.getPingName() + 
GRPC_TEST_PONG_VALUE)
-                .setPongId(request.getPingId()).build();
-
-        responseObserver.onNext(response);
-        responseObserver.onCompleted();
-    }
-
-    @Override
-    public StreamObserver<PingRequest> 
pingAsyncAsync(StreamObserver<PongResponse> responseObserver) {
-        return new StreamObserver<>() {
-            @Override
-            public void onNext(PingRequest request) {
-                LOG.infof("gRPC server pingAsyncAsync received data from 
PingPong service PingId=%s PingName=%s",
-                        request.getPingId(),
-                        request.getPingName());
-                PongResponse response = 
PongResponse.newBuilder().setPongName(request.getPingName() + 
GRPC_TEST_PONG_VALUE)
-                        .setPongId(request.getPingId()).build();
-                responseObserver.onNext(response);
-            }
-
-            @Override
-            public void onError(Throwable throwable) {
-                responseObserver.onError(throwable);
-            }
-
-            @Override
-            public void onCompleted() {
-                responseObserver.onCompleted();
-            }
-        };
-    }
-}
diff --git 
a/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java
 
b/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java
index acfd42b6d7..961807b4cc 100644
--- 
a/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java
+++ 
b/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java
@@ -63,7 +63,6 @@ import static 
org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_E
 import static 
org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT;
 import static 
org.apache.camel.component.grpc.GrpcConstants.GRPC_METHOD_NAME_HEADER;
 import static 
org.apache.camel.quarkus.component.grpc.it.GrpcRoute.GRPC_JWT_SECRET;
-import static 
org.apache.camel.quarkus.component.grpc.it.PingPongImpl.GRPC_TEST_PONG_VALUE;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -81,6 +80,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 class GrpcTest {
 
     private static final String GRPC_TEST_PING_VALUE = "PING";
+    private static final String GRPC_TEST_PONG_VALUE = "PONG";
     private static final int GRPC_TEST_PING_ID = 1234;
     private static final Logger LOG = Logger.getLogger(GrpcTest.class);
 
@@ -198,15 +198,27 @@ class GrpcTest {
         Config config = ConfigProvider.getConfig();
         Integer port = 
config.getValue("camel.grpc.test.forward.error.server.port", Integer.class);
         CountDownLatch latch = new CountDownLatch(1);
+        PingRequest pingRequest = PingRequest.newBuilder()
+                .setPingName(GRPC_TEST_PING_VALUE)
+                .setPingId(GRPC_TEST_PING_ID)
+                .build();
 
         ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 
port).usePlaintext().build();
         try {
             PingPongStub pingPongStub = PingPongGrpc.newStub(channel);
             LOG.info("forwardOnError: preparing observers");
-            PongResponseStreamObserver responseObserver = new 
PongResponseStreamObserver(latch, true);
+            PongResponseStreamObserver responseObserver = new 
PongResponseStreamObserver(latch);
             StreamObserver<PingRequest> requestObserver = 
pingPongStub.pingAsyncAsync(responseObserver);
-            LOG.info("forwardOnError: calling onNext(null)");
-            requestObserver.onNext(null);
+            // GRPC uses DelayedClientCall, which queues requests before real 
call is ready.
+            // Thus we first need to establish real call with calling eg. 
`onNext` and wait for its response.
+            requestObserver.onNext(pingRequest);
+            Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> 
responseObserver.getPongResponse() != null);
+            // Then we can finally mimic failure by calling `onError`.
+            // If we wouldn't establish the real call first, the 
DelayedClientCall would just call `onError` on ResponseObserver immediately 
without propagating it to real call
+            // (which we need for testing forwardOnError option on camel 
server side).
+            // More details in: 
https://github.com/grpc/grpc-java/blob/v1.76.0/core/src/main/java/io/grpc/internal/DelayedClientCall.java#L256
+            // Also note this comment 
https://github.com/apache/camel-quarkus/issues/7897#issuecomment-3480980023
+            requestObserver.onError(new IllegalStateException("Forced 
exception"));
 
             LOG.info("forwardOnError: waiting latch.await(5s)");
             assertTrue(latch.await(5, TimeUnit.SECONDS));
@@ -657,17 +669,11 @@ class GrpcTest {
 
     static final class PongResponseStreamObserver implements 
StreamObserver<PongResponse> {
         private final CountDownLatch latch;
-        private final boolean simulateError;
         private PongResponse pongResponse;
         private volatile Throwable errorResponse;
 
         public PongResponseStreamObserver(CountDownLatch latch) {
-            this(latch, false);
-        }
-
-        public PongResponseStreamObserver(CountDownLatch latch, boolean 
simulateError) {
             this.latch = latch;
-            this.simulateError = simulateError;
         }
 
         public PongResponse getPongResponse() {
@@ -682,9 +688,6 @@ class GrpcTest {
         public void onNext(PongResponse value) {
             LOG.infof("PongResponseStreamObserver#onNext:%s", value);
             pongResponse = value;
-            if (simulateError) {
-                throw new IllegalStateException("Forced exception");
-            }
         }
 
         @Override

Reply via email to