This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-grpc.git


The following commit(s) were added to refs/heads/main by this push:
     new 82300037 Rich error model client API (#645)
82300037 is described below

commit 82300037e039bcd8fd23abf0de6ed3967e4cbc27
Author: PJ Fanning <[email protected]>
AuthorDate: Fri Mar 6 16:17:18 2026 +0100

    Rich error model client API (#645)
    
    * Initial plan
    
    * Add rich error model client API (MetadataStatus) from akka-grpc PRs #1665 
and #1740
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Fix doc comment grammar in MetadataStatus trait
    
    Co-authored-by: pjfanning <[email protected]>
    
    * format
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../myapp/helloworld/grpc/RichErrorImpl.java       |   1 +
 .../helloworld/grpc/RichErrorModelNativeTest.java  | 103 +++++++++++
 .../myapp/helloworld/grpc/RichErrorNativeImpl.java |   5 +-
 .../grpc/scaladsl/RichErrorModelNativeSpec.scala   | 204 +++++++++++++++++++++
 .../myapp/helloworld/LiftedGreeterClient.java      |   1 +
 .../LoggingErrorHandlingGreeterServer.java         |   3 +
 .../apache/pekko/grpc/GrpcServiceException.scala   |  18 +-
 .../apache/pekko/grpc/internal/MetadataImpl.scala  |  49 ++++-
 .../pekko/grpc/internal/RequestBuilderImpl.scala   |  44 ++++-
 .../org/apache/pekko/grpc/javadsl/Metadata.scala   |  16 ++
 .../org/apache/pekko/grpc/scaladsl/Metadata.scala  |  17 ++
 11 files changed, 445 insertions(+), 16 deletions(-)

diff --git 
a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java 
b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java
index f7118b36..542431dc 100644
--- 
a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java
+++ 
b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java
@@ -48,6 +48,7 @@ public class RichErrorImpl implements GreeterService {
     future.completeExceptionally(statusRuntimeException);
     return future;
   }
+
   // #rich_error_model_unary
 
   @Override
diff --git 
a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelNativeTest.java
 
b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelNativeTest.java
new file mode 100644
index 00000000..29cb4b59
--- /dev/null
+++ 
b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelNativeTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2018-2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package example.myapp.helloworld.grpc;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.rpc.error_details.LocalizedMessage;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import java.util.concurrent.CompletionStage;
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.grpc.GrpcClientSettings;
+import org.apache.pekko.grpc.GrpcServiceException;
+import org.apache.pekko.grpc.javadsl.MetadataStatus;
+import org.apache.pekko.http.javadsl.Http;
+import org.apache.pekko.http.javadsl.ServerBinding;
+import org.apache.pekko.http.javadsl.model.HttpRequest;
+import org.apache.pekko.http.javadsl.model.HttpResponse;
+import org.junit.Assert;
+import org.junit.Test;
+import org.scalatestplus.junit.JUnitSuite;
+
+public class RichErrorModelNativeTest extends JUnitSuite {
+
+  private ServerBinding run(ActorSystem sys) throws Exception {
+
+    GreeterService impl = new RichErrorNativeImpl();
+
+    org.apache.pekko.japi.function.Function<HttpRequest, 
CompletionStage<HttpResponse>> service =
+        GreeterServiceHandlerFactory.create(impl, sys);
+    CompletionStage<ServerBinding> bound =
+        Http.get(sys).newServerAt("127.0.0.1", 8091).bind(service);
+
+    bound.thenAccept(
+        binding -> {
+          System.out.println("gRPC server bound to: " + 
binding.localAddress());
+        });
+    return bound.toCompletableFuture().get();
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testNativeApi() throws Exception {
+    Config conf = ConfigFactory.load();
+    ActorSystem sys = ActorSystem.create("HelloWorld", conf);
+    run(sys);
+
+    GrpcClientSettings settings =
+        GrpcClientSettings.connectToServiceAt("127.0.0.1", 8091, 
sys).withTls(false);
+
+    GreeterServiceClient client = null;
+    try {
+      client = GreeterServiceClient.create(settings, sys);
+
+      // #client_request
+      HelloRequest request = 
HelloRequest.newBuilder().setName("Alice").build();
+      CompletionStage<HelloReply> response = client.sayHello(request);
+      StatusRuntimeException statusRuntimeException =
+          response
+              .toCompletableFuture()
+              .handle(
+                  (res, ex) -> {
+                    return (StatusRuntimeException) ex;
+                  })
+              .get();
+
+      GrpcServiceException ex = 
GrpcServiceException.apply(statusRuntimeException);
+      MetadataStatus meta = (MetadataStatus) ex.getMetadata();
+      assertEquals(
+          "type.googleapis.com/google.rpc.LocalizedMessage", 
meta.getDetails().get(0).typeUrl());
+
+      assertEquals(Status.INVALID_ARGUMENT.getCode().value(), meta.getCode());
+      assertEquals("What is wrong?", meta.getMessage());
+
+      LocalizedMessage details =
+          
meta.getParsedDetails(com.google.rpc.error_details.LocalizedMessage.messageCompanion())
+              .get(0);
+      assertEquals("The password!", details.message());
+      assertEquals("EN", details.locale());
+      // #client_request
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail("Got unexpected error " + e.getMessage());
+    } finally {
+      if (client != null) client.close();
+      sys.terminate();
+    }
+  }
+}
diff --git 
a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java
 
b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java
index 5b32b794..66921a82 100644
--- 
a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java
+++ 
b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java
@@ -21,7 +21,6 @@ import java.util.concurrent.CompletionStage;
 import org.apache.pekko.NotUsed;
 import org.apache.pekko.grpc.GrpcServiceException;
 import org.apache.pekko.stream.javadsl.Source;
-import scala.jdk.javaapi.CollectionConverters;
 
 public class RichErrorNativeImpl implements GreeterService {
 
@@ -33,13 +32,13 @@ public class RichErrorNativeImpl implements GreeterService {
     ar.add(LocalizedMessage.of("EN", "The password!"));
 
     GrpcServiceException exception =
-        GrpcServiceException.apply(
-            Code.INVALID_ARGUMENT, "What is wrong?", 
CollectionConverters.asScala(ar).toSeq());
+        GrpcServiceException.create(Code.INVALID_ARGUMENT, "What is wrong?", 
ar);
 
     CompletableFuture<HelloReply> future = new CompletableFuture<>();
     future.completeExceptionally(exception);
     return future;
   }
+
   // #rich_error_model_unary
 
   @Override
diff --git 
a/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/RichErrorModelNativeSpec.scala
 
b/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/RichErrorModelNativeSpec.scala
new file mode 100644
index 00000000..596d29a8
--- /dev/null
+++ 
b/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/RichErrorModelNativeSpec.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2020-2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.grpc.scaladsl
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.actor.ActorSystem
+import pekko.grpc.{ GrpcClientSettings, GrpcServiceException }
+import pekko.http.scaladsl.Http
+import pekko.http.scaladsl.model.{ HttpRequest, HttpResponse }
+import pekko.stream.scaladsl.{ Sink, Source }
+import pekko.testkit.TestKit
+import com.google.rpc.Code
+import com.google.rpc.error_details.LocalizedMessage
+import com.typesafe.config.ConfigFactory
+import example.myapp.helloworld.grpc.helloworld._
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.Span
+import org.scalatest.wordspec.AnyWordSpecLike
+
+import scala.concurrent.duration._
+import scala.concurrent.{ ExecutionContext, Future }
+
+class RichErrorModelNativeSpec
+    extends TestKit(ActorSystem("RichErrorNativeSpec"))
+    with AnyWordSpecLike
+    with Matchers
+    with BeforeAndAfterAll
+    with ScalaFutures {
+
+  override implicit val patienceConfig: PatienceConfig = 
PatienceConfig(5.seconds, Span(10, org.scalatest.time.Millis))
+
+  implicit val sys: ActorSystem = system
+  implicit val ec: ExecutionContext = sys.dispatcher
+
+  object RichErrorNativeImpl extends GreeterService {
+
+    // #rich_error_model_unary
+    def sayHello(in: HelloRequest): Future[HelloReply] = {
+      Future.failed(
+        GrpcServiceException(Code.INVALID_ARGUMENT, "What is wrong?", Seq(new 
LocalizedMessage("EN", "The password!"))))
+    }
+    // #rich_error_model_unary
+
+    def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
+      Source.failed(
+        GrpcServiceException(Code.INVALID_ARGUMENT, "What is wrong?", Seq(new 
LocalizedMessage("EN", "The password!"))))
+    }
+
+    override def itKeepsTalking(in: Source[HelloRequest, NotUsed]): 
Future[HelloReply] = {
+      in.runWith(Sink.seq).flatMap { _ =>
+        Future.failed(
+          GrpcServiceException(
+            Code.INVALID_ARGUMENT,
+            "What is wrong?",
+            Seq(new LocalizedMessage("EN", "The password!"))))
+      }
+    }
+
+    override def streamHellos(in: Source[HelloRequest, NotUsed]): 
Source[HelloReply, NotUsed] = {
+      Source.failed(
+        GrpcServiceException(Code.INVALID_ARGUMENT, "What is wrong?", Seq(new 
LocalizedMessage("EN", "The password!"))))
+    }
+  }
+
+  val service: HttpRequest => Future[HttpResponse] =
+    GreeterServiceHandler(RichErrorNativeImpl)
+
+  val bound =
+    Http(system).newServerAt(interface = "127.0.0.1", port = 
0).bind(service).futureValue
+
+  val client = GreeterServiceClient(
+    GrpcClientSettings.connectToServiceAt("127.0.0.1", 
bound.localAddress.getPort).withTls(false))
+
+  val conf = 
ConfigFactory.load().withFallback(ConfigFactory.defaultApplication())
+
+  "Rich error model" should {
+
+    "work with the native api on a unary call" in {
+
+      // #client_request
+      val richErrorResponse = 
client.sayHello(HelloRequest("Bob")).failed.futureValue
+
+      richErrorResponse match {
+        case status: GrpcServiceException =>
+          status.metadata match {
+            case richMetadata: MetadataStatus =>
+              richMetadata.details(0).typeUrl should 
be("type.googleapis.com/google.rpc.LocalizedMessage")
+
+              import LocalizedMessage.messageCompanion
+              val localizedMessage = 
richMetadata.getParsedDetails[LocalizedMessage].head
+              localizedMessage.message should be("The password!")
+              localizedMessage.locale should be("EN")
+
+              richMetadata.code should be(3)
+              richMetadata.message should be("What is wrong?")
+
+            case other => fail(s"This should be a MetadataStatus but it is 
${other.getClass}")
+          }
+
+        case ex => fail(s"This should be a GrpcServiceException but it is 
${ex.getClass}")
+      }
+      // #client_request
+    }
+
+    "work with the native api on a stream request" in {
+
+      val requests = List("Alice", "Bob", "Peter").map(HelloRequest(_))
+
+      val richErrorResponse = 
client.itKeepsTalking(Source(requests)).failed.futureValue
+
+      richErrorResponse match {
+        case status: GrpcServiceException =>
+          status.metadata match {
+            case metadata: MetadataStatus =>
+              metadata.details(0).typeUrl should 
be("type.googleapis.com/google.rpc.LocalizedMessage")
+
+              import LocalizedMessage.messageCompanion
+              val localizedMessage = 
metadata.getParsedDetails[LocalizedMessage].head
+
+              metadata.code should be(3)
+              metadata.message should be("What is wrong?")
+              localizedMessage.message should be("The password!")
+              localizedMessage.locale should be("EN")
+
+            case other => fail(s"This should be a MetadataStatus but it is 
${other.getClass}")
+          }
+
+        case ex => fail(s"This should be a GrpcServiceException but it is 
${ex.getClass}")
+      }
+
+    }
+
+    "work with the native api on a stream response" in {
+      val richErrorResponseStream = client.itKeepsReplying(HelloRequest("Bob"))
+      val richErrorResponse =
+        richErrorResponseStream.run().failed.futureValue
+
+      richErrorResponse match {
+        case status: GrpcServiceException =>
+          status.metadata match {
+            case metadata: MetadataStatus =>
+              metadata.details(0).typeUrl should 
be("type.googleapis.com/google.rpc.LocalizedMessage")
+
+              val localizedMessage = 
metadata.getParsedDetails[LocalizedMessage].head
+
+              metadata.code should be(3)
+              metadata.message should be("What is wrong?")
+              localizedMessage.message should be("The password!")
+              localizedMessage.locale should be("EN")
+
+            case other => fail(s"This should be a MetadataStatus but it is 
${other.getClass}")
+          }
+
+        case ex => fail(s"This should be a GrpcServiceException but it is 
${ex.getClass}")
+      }
+
+    }
+
+    "work with the native api on a bidi stream" in {
+
+      val requests = List("Alice", "Bob", "Peter").map(HelloRequest(_))
+      val richErrorResponseStream = client.streamHellos(Source(requests))
+      val richErrorResponse =
+        richErrorResponseStream.run().failed.futureValue
+
+      richErrorResponse match {
+        case status: GrpcServiceException =>
+          status.metadata match {
+            case metadata: MetadataStatus =>
+              metadata.details(0).typeUrl should 
be("type.googleapis.com/google.rpc.LocalizedMessage")
+
+              val localizedMessage = 
metadata.getParsedDetails[LocalizedMessage].head
+
+              metadata.code should be(3)
+              metadata.message should be("What is wrong?")
+              localizedMessage.message should be("The password!")
+              localizedMessage.locale should be("EN")
+
+            case other => fail(s"This should be a MetadataStatus but it is 
${other.getClass}")
+          }
+
+        case ex => fail(s"This should be a GrpcServiceException but it is 
${ex.getClass}")
+      }
+
+    }
+
+  }
+
+  override def afterAll(): Unit = system.terminate().futureValue
+}
diff --git 
a/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java
 
b/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java
index bac878bc..d3d56c8f 100644
--- 
a/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java
+++ 
b/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java
@@ -61,6 +61,7 @@ class LiftedGreeterClient {
     CompletionStage<HelloReply> reply = client.sayHello().addHeader("key", 
"value").invoke(request);
     System.out.println("got single reply: " + 
reply.toCompletableFuture().get(5, TimeUnit.SECONDS));
   }
+
   // #with-metadata
 
   private static void streamingRequest(GreeterServiceClient client) throws 
Exception {
diff --git 
a/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java
 
b/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java
index 49dff12a..c3c5752a 100644
--- 
a/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java
+++ 
b/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java
@@ -73,6 +73,7 @@ public class LoggingErrorHandlingGreeterServer {
       }
     }
   }
+
   // #implementation
 
   // #method
@@ -121,6 +122,7 @@ public class LoggingErrorHandlingGreeterServer {
                   }
                 }));
   }
+
   // #method
 
   // #custom-error-mapping
@@ -132,6 +134,7 @@ public class LoggingErrorHandlingGreeterServer {
           return null;
         }
       };
+
   // #custom-error-mapping
 
   public static CompletionStage<ServerBinding> run(ActorSystem sys) throws 
Exception {
diff --git 
a/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala 
b/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala
index f0440a2b..191c4c62 100644
--- a/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala
+++ b/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala
@@ -17,12 +17,26 @@ import io.grpc.{ Status, StatusRuntimeException }
 import org.apache.pekko
 import pekko.annotation.ApiMayChange
 import pekko.grpc.scaladsl.{ Metadata, MetadataBuilder }
-import pekko.grpc.internal.{ GrpcMetadataImpl, JavaMetadataImpl }
+import pekko.grpc.internal.{ GrpcMetadataImpl, JavaMetadataImpl, 
RichGrpcMetadataImpl }
 import com.google.protobuf.any.Any
 import io.grpc.protobuf.StatusProto
+import scala.jdk.CollectionConverters._
 
 object GrpcServiceException {
 
+  /**
+   * Java API
+   */
+  def create(
+      code: com.google.rpc.Code,
+      message: String,
+      details: java.util.List[scalapb.GeneratedMessage]): GrpcServiceException 
= {
+    apply(code, message, details.asScala.toVector)
+  }
+
+  /**
+   * Scala API
+   */
   def apply(
       code: com.google.rpc.Code,
       message: String,
@@ -45,7 +59,7 @@ object GrpcServiceException {
   }
 
   def apply(ex: StatusRuntimeException): GrpcServiceException = {
-    new GrpcServiceException(ex.getStatus, new 
GrpcMetadataImpl(ex.getTrailers))
+    new GrpcServiceException(ex.getStatus, new 
RichGrpcMetadataImpl(ex.getStatus, ex.getTrailers))
   }
 }
 
diff --git 
a/runtime/src/main/scala/org/apache/pekko/grpc/internal/MetadataImpl.scala 
b/runtime/src/main/scala/org/apache/pekko/grpc/internal/MetadataImpl.scala
index a814069f..9fbd1c9b 100644
--- a/runtime/src/main/scala/org/apache/pekko/grpc/internal/MetadataImpl.scala
+++ b/runtime/src/main/scala/org/apache/pekko/grpc/internal/MetadataImpl.scala
@@ -25,7 +25,10 @@ import pekko.http.scaladsl.model.HttpHeader
 import pekko.japi.Pair
 import pekko.util.ByteString
 import pekko.grpc.javadsl
-import pekko.grpc.scaladsl.{ BytesEntry, Metadata, MetadataEntry, StringEntry }
+import pekko.grpc.scaladsl.{ BytesEntry, Metadata, MetadataEntry, 
MetadataStatus, StringEntry }
+import com.google.protobuf.any
+import com.google.rpc.Status
+import scalapb.{ GeneratedMessage, GeneratedMessageCompanion }
 
 @InternalApi private[pekko] object MetadataImpl {
   val BINARY_SUFFIX: String = io.grpc.Metadata.BINARY_HEADER_SUFFIX
@@ -196,7 +199,7 @@ class HeaderMetadataImpl(headers: immutable.Seq[HttpHeader] 
= immutable.Seq.empt
  * @param delegate The underlying Scala metadata instance.
  */
 @InternalApi
-class JavaMetadataImpl(delegate: Metadata) extends javadsl.Metadata {
+class JavaMetadataImpl(val delegate: Metadata) extends javadsl.Metadata with 
javadsl.MetadataStatus {
   override def getText(key: String): Optional[String] =
     delegate.getText(key).toJava
 
@@ -214,4 +217,46 @@ class JavaMetadataImpl(delegate: Metadata) extends 
javadsl.Metadata {
 
   override def toString: String =
     delegate.toString
+
+  private def richDelegate =
+    delegate match {
+      case r: MetadataStatus => r
+      case other             => throw new IllegalArgumentException(s"Delegate 
metadata is not MetadataStatus but ${other.getClass}")
+    }
+
+  override def getStatus(): Status = richDelegate.status
+
+  override def getCode(): Int = richDelegate.code
+
+  override def getMessage(): String = richDelegate.message
+
+  private lazy val javaDetails: jList[com.google.protobuf.any.Any] = 
richDelegate.details.asJava
+  def getDetails(): jList[com.google.protobuf.any.Any] = javaDetails
+
+  def getParsedDetails[K <: GeneratedMessage](companion: 
GeneratedMessageCompanion[K]): jList[K] =
+    richDelegate.getParsedDetails(companion).asJava
+}
+
+class RichGrpcMetadataImpl(delegate: io.grpc.Status, meta: io.grpc.Metadata)
+    extends GrpcMetadataImpl(meta)
+    with MetadataStatus {
+  override val raw: Option[io.grpc.Metadata] = Some(meta)
+  override lazy val status: com.google.rpc.Status =
+    io.grpc.protobuf.StatusProto.fromStatusAndTrailers(delegate, meta)
+
+  override def code: Int = status.getCode
+  override def message: String = status.getMessage
+
+  override lazy val details: Seq[any.Any] = status.getDetailsList.asScala.map 
{ item =>
+    fromJavaProto(item)
+  }.toVector
+
+  def getParsedDetails[K <: scalapb.GeneratedMessage](
+      implicit companion: scalapb.GeneratedMessageCompanion[K]): Seq[K] = {
+    val typeUrl = "type.googleapis.com/" + companion.scalaDescriptor.fullName
+    details.filter(_.typeUrl == typeUrl).map(_.unpack)
+  }
+
+  private def fromJavaProto(javaPbSource: com.google.protobuf.Any): 
com.google.protobuf.any.Any =
+    com.google.protobuf.any.Any(typeUrl = javaPbSource.getTypeUrl, value = 
javaPbSource.getValue)
 }
diff --git 
a/runtime/src/main/scala/org/apache/pekko/grpc/internal/RequestBuilderImpl.scala
 
b/runtime/src/main/scala/org/apache/pekko/grpc/internal/RequestBuilderImpl.scala
index 6d3205d3..648c2021 100644
--- 
a/runtime/src/main/scala/org/apache/pekko/grpc/internal/RequestBuilderImpl.scala
+++ 
b/runtime/src/main/scala/org/apache/pekko/grpc/internal/RequestBuilderImpl.scala
@@ -18,8 +18,8 @@ import java.util.concurrent.CompletionStage
 import org.apache.pekko
 import pekko.NotUsed
 import pekko.annotation.{ InternalApi, InternalStableApi }
-import pekko.grpc.{ GrpcClientSettings, GrpcResponseMetadata, 
GrpcSingleResponse }
-import pekko.stream.Materializer
+import pekko.grpc.{ GrpcClientSettings, GrpcResponseMetadata, 
GrpcServiceException, GrpcSingleResponse }
+import pekko.stream.{ Graph, Materializer, SourceShape }
 import pekko.stream.javadsl.{ Source => JavaSource }
 import pekko.stream.scaladsl.{ Keep, Sink, Source }
 import pekko.util.ByteString
@@ -52,10 +52,12 @@ final class ScalaUnaryRequestBuilder[I, O](
     NettyClientUtils.callOptionsWithDeadline(defaultOptions, settings)
 
   override def invoke(request: I): Future[O] =
-    channel.invoke(request, headers, descriptor, defaultOptions)
+    channel.invoke(request, headers, descriptor, 
defaultOptions).recoverWith(RequestBuilderImpl.richError)
 
   override def invokeWithMetadata(request: I): Future[GrpcSingleResponse[O]] =
-    channel.invokeWithMetadata(request, headers, descriptor, 
callOptionsWithDeadline())
+    channel
+      .invokeWithMetadata(request, headers, descriptor, 
callOptionsWithDeadline())
+      .recoverWith(RequestBuilderImpl.richError)
 
   override def withHeaders(headers: MetadataImpl): ScalaUnaryRequestBuilder[I, 
O] =
     new ScalaUnaryRequestBuilder[I, O](descriptor, channel, defaultOptions, 
settings, headers)
@@ -118,7 +120,7 @@ final class ScalaClientStreamingRequestBuilder[I, O](
     NettyClientUtils.callOptionsWithDeadline(defaultOptions, settings)
 
   override def invoke(request: Source[I, NotUsed]): Future[O] =
-    invokeWithMetadata(request).map(_.value)(ExecutionContext.parasitic)
+    
invokeWithMetadata(request).map(_.value)(ExecutionContext.parasitic).recoverWith(RequestBuilderImpl.richError)
 
   override def invokeWithMetadata(source: Source[I, NotUsed]): 
Future[GrpcSingleResponse[O]] = {
     // a bit much overhead here because we are using the flow to represent a 
single response
@@ -144,6 +146,7 @@ final class ScalaClientStreamingRequestBuilder[I, O](
             def getTrailers() = metadata.getTrailers()
           }
       }(ExecutionContext.parasitic)
+      .recoverWith(RequestBuilderImpl.richError)
   }
 
   override def withHeaders(headers: MetadataImpl): 
ScalaClientStreamingRequestBuilder[I, O] =
@@ -207,10 +210,14 @@ final class ScalaServerStreamingRequestBuilder[I, O](
     NettyClientUtils.callOptionsWithDeadline(defaultOptions, settings)
 
   override def invoke(request: I): Source[O, NotUsed] =
-    invokeWithMetadata(request).mapMaterializedValue(_ => NotUsed)
+    invokeWithMetadata(request)
+      .mapMaterializedValue(_ => NotUsed)
+      .recoverWithRetries(1, RequestBuilderImpl.richErrorStream)
 
   override def invokeWithMetadata(request: I): Source[O, 
Future[GrpcResponseMetadata]] =
-    channel.invokeWithMetadata(Source.single(request), headers, descriptor, 
true, callOptionsWithDeadline())
+    channel
+      .invokeWithMetadata(Source.single(request), headers, descriptor, true, 
callOptionsWithDeadline())
+      .recoverWithRetries(1, RequestBuilderImpl.richErrorStream)
 
   override def withHeaders(headers: MetadataImpl): 
ScalaServerStreamingRequestBuilder[I, O] =
     new ScalaServerStreamingRequestBuilder[I, O](descriptor, channel, 
defaultOptions, settings, headers)
@@ -274,10 +281,14 @@ final class ScalaBidirectionalStreamingRequestBuilder[I, 
O](
     NettyClientUtils.callOptionsWithDeadline(defaultOptions, settings)
 
   override def invoke(request: Source[I, NotUsed]): Source[O, NotUsed] =
-    invokeWithMetadata(request).mapMaterializedValue(_ => NotUsed)
+    invokeWithMetadata(request)
+      .mapMaterializedValue(_ => NotUsed)
+      .recoverWithRetries(1, RequestBuilderImpl.richErrorStream)
 
   override def invokeWithMetadata(source: Source[I, NotUsed]): Source[O, 
Future[GrpcResponseMetadata]] =
-    channel.invokeWithMetadata(source, headers, descriptor, true, 
callOptionsWithDeadline())
+    channel
+      .invokeWithMetadata(source, headers, descriptor, true, 
callOptionsWithDeadline())
+      .recoverWithRetries(1, RequestBuilderImpl.richErrorStream)
 
   override def withHeaders(headers: MetadataImpl): 
ScalaBidirectionalStreamingRequestBuilder[I, O] =
     new ScalaBidirectionalStreamingRequestBuilder[I, O](descriptor, channel, 
defaultOptions, settings, headers)
@@ -331,3 +342,18 @@ trait MetadataOperations[T <: MetadataOperations[T]] {
   def addHeader(key: String, value: ByteString): T =
     withHeaders(headers = headers.addEntry(key, value))
 }
+
+object RequestBuilderImpl {
+  def richErrorStream[U]: PartialFunction[Throwable, Graph[SourceShape[U], 
NotUsed]] = {
+    case item => Source.failed(RequestBuilderImpl.lift(item))
+  }
+
+  def richError[U]: PartialFunction[Throwable, Future[U]] = {
+    case item => Future.failed(RequestBuilderImpl.lift(item))
+  }
+
+  def lift(item: Throwable): scala.Throwable = item match {
+    case ex: StatusRuntimeException => GrpcServiceException(ex)
+    case other                      => other
+  }
+}
diff --git 
a/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala 
b/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala
index 4dc69c83..5a455bcc 100644
--- a/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala
+++ b/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala
@@ -59,3 +59,19 @@ trait Metadata {
    */
   def asScala: scaladsl.Metadata
 }
+
+/**
+ * Provides access to richer error details using the logical gRPC 
com.google.rpc.Status message, see
+ * [API Design Guide](https://cloud.google.com/apis/design/errors) for more 
details.
+ *
+ * Not for user extension
+ */
+@ApiMayChange
+@DoNotInherit
+trait MetadataStatus extends Metadata {
+  def getStatus(): com.google.rpc.Status
+  def getCode(): Int
+  def getMessage(): String
+  def getDetails(): List[com.google.protobuf.any.Any]
+  def getParsedDetails[K <: scalapb.GeneratedMessage](companion: 
scalapb.GeneratedMessageCompanion[K]): List[K]
+}
diff --git 
a/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala 
b/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala
index d8ce6056..e566a789 100644
--- a/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala
+++ b/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala
@@ -16,6 +16,7 @@ package org.apache.pekko.grpc.scaladsl
 import org.apache.pekko
 import pekko.annotation.{ ApiMayChange, DoNotInherit, InternalApi }
 import pekko.util.ByteString
+import com.google.protobuf.any
 
 /**
  * Immutable representation of the metadata in a call
@@ -55,3 +56,19 @@ import pekko.util.ByteString
   @ApiMayChange
   def asList: List[(String, MetadataEntry)]
 }
+
+/**
+ * Provides access to richer error details using the logical gRPC 
com.google.rpc.Status message, see
+ * [API Design Guide](https://cloud.google.com/apis/design/errors) for more 
details.
+ *
+ * Not for user extension
+ */
+@ApiMayChange
+@DoNotInherit
+trait MetadataStatus extends Metadata {
+  def status: com.google.rpc.Status
+  def code: Int
+  def message: String
+  def details: Seq[any.Any]
+  def getParsedDetails[K <: scalapb.GeneratedMessage](implicit msg: 
scalapb.GeneratedMessageCompanion[K]): Seq[K]
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to