This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-grpc.git
The following commit(s) were added to refs/heads/main by this push:
new 82300037 Rich error model client API (#645)
82300037 is described below
commit 82300037e039bcd8fd23abf0de6ed3967e4cbc27
Author: PJ Fanning <[email protected]>
AuthorDate: Fri Mar 6 16:17:18 2026 +0100
Rich error model client API (#645)
* Initial plan
* Add rich error model client API (MetadataStatus) from akka-grpc PRs #1665
and #1740
Co-authored-by: pjfanning <[email protected]>
* Fix doc comment grammar in MetadataStatus trait
Co-authored-by: pjfanning <[email protected]>
* format
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../myapp/helloworld/grpc/RichErrorImpl.java | 1 +
.../helloworld/grpc/RichErrorModelNativeTest.java | 103 +++++++++++
.../myapp/helloworld/grpc/RichErrorNativeImpl.java | 5 +-
.../grpc/scaladsl/RichErrorModelNativeSpec.scala | 204 +++++++++++++++++++++
.../myapp/helloworld/LiftedGreeterClient.java | 1 +
.../LoggingErrorHandlingGreeterServer.java | 3 +
.../apache/pekko/grpc/GrpcServiceException.scala | 18 +-
.../apache/pekko/grpc/internal/MetadataImpl.scala | 49 ++++-
.../pekko/grpc/internal/RequestBuilderImpl.scala | 44 ++++-
.../org/apache/pekko/grpc/javadsl/Metadata.scala | 16 ++
.../org/apache/pekko/grpc/scaladsl/Metadata.scala | 17 ++
11 files changed, 445 insertions(+), 16 deletions(-)
diff --git
a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java
b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java
index f7118b36..542431dc 100644
---
a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java
+++
b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorImpl.java
@@ -48,6 +48,7 @@ public class RichErrorImpl implements GreeterService {
future.completeExceptionally(statusRuntimeException);
return future;
}
+
// #rich_error_model_unary
@Override
diff --git
a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelNativeTest.java
b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelNativeTest.java
new file mode 100644
index 00000000..29cb4b59
--- /dev/null
+++
b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorModelNativeTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2018-2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package example.myapp.helloworld.grpc;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.rpc.error_details.LocalizedMessage;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import java.util.concurrent.CompletionStage;
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.grpc.GrpcClientSettings;
+import org.apache.pekko.grpc.GrpcServiceException;
+import org.apache.pekko.grpc.javadsl.MetadataStatus;
+import org.apache.pekko.http.javadsl.Http;
+import org.apache.pekko.http.javadsl.ServerBinding;
+import org.apache.pekko.http.javadsl.model.HttpRequest;
+import org.apache.pekko.http.javadsl.model.HttpResponse;
+import org.junit.Assert;
+import org.junit.Test;
+import org.scalatestplus.junit.JUnitSuite;
+
+public class RichErrorModelNativeTest extends JUnitSuite {
+
+ private ServerBinding run(ActorSystem sys) throws Exception {
+
+ GreeterService impl = new RichErrorNativeImpl();
+
+ org.apache.pekko.japi.function.Function<HttpRequest,
CompletionStage<HttpResponse>> service =
+ GreeterServiceHandlerFactory.create(impl, sys);
+ CompletionStage<ServerBinding> bound =
+ Http.get(sys).newServerAt("127.0.0.1", 8091).bind(service);
+
+ bound.thenAccept(
+ binding -> {
+ System.out.println("gRPC server bound to: " +
binding.localAddress());
+ });
+ return bound.toCompletableFuture().get();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testNativeApi() throws Exception {
+ Config conf = ConfigFactory.load();
+ ActorSystem sys = ActorSystem.create("HelloWorld", conf);
+ run(sys);
+
+ GrpcClientSettings settings =
+ GrpcClientSettings.connectToServiceAt("127.0.0.1", 8091,
sys).withTls(false);
+
+ GreeterServiceClient client = null;
+ try {
+ client = GreeterServiceClient.create(settings, sys);
+
+ // #client_request
+ HelloRequest request =
HelloRequest.newBuilder().setName("Alice").build();
+ CompletionStage<HelloReply> response = client.sayHello(request);
+ StatusRuntimeException statusRuntimeException =
+ response
+ .toCompletableFuture()
+ .handle(
+ (res, ex) -> {
+ return (StatusRuntimeException) ex;
+ })
+ .get();
+
+ GrpcServiceException ex =
GrpcServiceException.apply(statusRuntimeException);
+ MetadataStatus meta = (MetadataStatus) ex.getMetadata();
+ assertEquals(
+ "type.googleapis.com/google.rpc.LocalizedMessage",
meta.getDetails().get(0).typeUrl());
+
+ assertEquals(Status.INVALID_ARGUMENT.getCode().value(), meta.getCode());
+ assertEquals("What is wrong?", meta.getMessage());
+
+ LocalizedMessage details =
+
meta.getParsedDetails(com.google.rpc.error_details.LocalizedMessage.messageCompanion())
+ .get(0);
+ assertEquals("The password!", details.message());
+ assertEquals("EN", details.locale());
+ // #client_request
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Got unexpected error " + e.getMessage());
+ } finally {
+ if (client != null) client.close();
+ sys.terminate();
+ }
+ }
+}
diff --git
a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java
b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java
index 5b32b794..66921a82 100644
---
a/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java
+++
b/interop-tests/src/test/java/example/myapp/helloworld/grpc/RichErrorNativeImpl.java
@@ -21,7 +21,6 @@ import java.util.concurrent.CompletionStage;
import org.apache.pekko.NotUsed;
import org.apache.pekko.grpc.GrpcServiceException;
import org.apache.pekko.stream.javadsl.Source;
-import scala.jdk.javaapi.CollectionConverters;
public class RichErrorNativeImpl implements GreeterService {
@@ -33,13 +32,13 @@ public class RichErrorNativeImpl implements GreeterService {
ar.add(LocalizedMessage.of("EN", "The password!"));
GrpcServiceException exception =
- GrpcServiceException.apply(
- Code.INVALID_ARGUMENT, "What is wrong?",
CollectionConverters.asScala(ar).toSeq());
+ GrpcServiceException.create(Code.INVALID_ARGUMENT, "What is wrong?",
ar);
CompletableFuture<HelloReply> future = new CompletableFuture<>();
future.completeExceptionally(exception);
return future;
}
+
// #rich_error_model_unary
@Override
diff --git
a/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/RichErrorModelNativeSpec.scala
b/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/RichErrorModelNativeSpec.scala
new file mode 100644
index 00000000..596d29a8
--- /dev/null
+++
b/interop-tests/src/test/scala/org/apache/pekko/grpc/scaladsl/RichErrorModelNativeSpec.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2020-2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.grpc.scaladsl
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.actor.ActorSystem
+import pekko.grpc.{ GrpcClientSettings, GrpcServiceException }
+import pekko.http.scaladsl.Http
+import pekko.http.scaladsl.model.{ HttpRequest, HttpResponse }
+import pekko.stream.scaladsl.{ Sink, Source }
+import pekko.testkit.TestKit
+import com.google.rpc.Code
+import com.google.rpc.error_details.LocalizedMessage
+import com.typesafe.config.ConfigFactory
+import example.myapp.helloworld.grpc.helloworld._
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.Span
+import org.scalatest.wordspec.AnyWordSpecLike
+
+import scala.concurrent.duration._
+import scala.concurrent.{ ExecutionContext, Future }
+
+class RichErrorModelNativeSpec
+ extends TestKit(ActorSystem("RichErrorNativeSpec"))
+ with AnyWordSpecLike
+ with Matchers
+ with BeforeAndAfterAll
+ with ScalaFutures {
+
+ override implicit val patienceConfig: PatienceConfig =
PatienceConfig(5.seconds, Span(10, org.scalatest.time.Millis))
+
+ implicit val sys: ActorSystem = system
+ implicit val ec: ExecutionContext = sys.dispatcher
+
+ object RichErrorNativeImpl extends GreeterService {
+
+ // #rich_error_model_unary
+ def sayHello(in: HelloRequest): Future[HelloReply] = {
+ Future.failed(
+ GrpcServiceException(Code.INVALID_ARGUMENT, "What is wrong?", Seq(new
LocalizedMessage("EN", "The password!"))))
+ }
+ // #rich_error_model_unary
+
+ def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
+ Source.failed(
+ GrpcServiceException(Code.INVALID_ARGUMENT, "What is wrong?", Seq(new
LocalizedMessage("EN", "The password!"))))
+ }
+
+ override def itKeepsTalking(in: Source[HelloRequest, NotUsed]):
Future[HelloReply] = {
+ in.runWith(Sink.seq).flatMap { _ =>
+ Future.failed(
+ GrpcServiceException(
+ Code.INVALID_ARGUMENT,
+ "What is wrong?",
+ Seq(new LocalizedMessage("EN", "The password!"))))
+ }
+ }
+
+ override def streamHellos(in: Source[HelloRequest, NotUsed]):
Source[HelloReply, NotUsed] = {
+ Source.failed(
+ GrpcServiceException(Code.INVALID_ARGUMENT, "What is wrong?", Seq(new
LocalizedMessage("EN", "The password!"))))
+ }
+ }
+
+ val service: HttpRequest => Future[HttpResponse] =
+ GreeterServiceHandler(RichErrorNativeImpl)
+
+ val bound =
+ Http(system).newServerAt(interface = "127.0.0.1", port =
0).bind(service).futureValue
+
+ val client = GreeterServiceClient(
+ GrpcClientSettings.connectToServiceAt("127.0.0.1",
bound.localAddress.getPort).withTls(false))
+
+ val conf =
ConfigFactory.load().withFallback(ConfigFactory.defaultApplication())
+
+ "Rich error model" should {
+
+ "work with the native api on a unary call" in {
+
+ // #client_request
+ val richErrorResponse =
client.sayHello(HelloRequest("Bob")).failed.futureValue
+
+ richErrorResponse match {
+ case status: GrpcServiceException =>
+ status.metadata match {
+ case richMetadata: MetadataStatus =>
+ richMetadata.details(0).typeUrl should
be("type.googleapis.com/google.rpc.LocalizedMessage")
+
+ import LocalizedMessage.messageCompanion
+ val localizedMessage =
richMetadata.getParsedDetails[LocalizedMessage].head
+ localizedMessage.message should be("The password!")
+ localizedMessage.locale should be("EN")
+
+ richMetadata.code should be(3)
+ richMetadata.message should be("What is wrong?")
+
+ case other => fail(s"This should be a MetadataStatus but it is
${other.getClass}")
+ }
+
+ case ex => fail(s"This should be a GrpcServiceException but it is
${ex.getClass}")
+ }
+ // #client_request
+ }
+
+ "work with the native api on a stream request" in {
+
+ val requests = List("Alice", "Bob", "Peter").map(HelloRequest(_))
+
+ val richErrorResponse =
client.itKeepsTalking(Source(requests)).failed.futureValue
+
+ richErrorResponse match {
+ case status: GrpcServiceException =>
+ status.metadata match {
+ case metadata: MetadataStatus =>
+ metadata.details(0).typeUrl should
be("type.googleapis.com/google.rpc.LocalizedMessage")
+
+ import LocalizedMessage.messageCompanion
+ val localizedMessage =
metadata.getParsedDetails[LocalizedMessage].head
+
+ metadata.code should be(3)
+ metadata.message should be("What is wrong?")
+ localizedMessage.message should be("The password!")
+ localizedMessage.locale should be("EN")
+
+ case other => fail(s"This should be a MetadataStatus but it is
${other.getClass}")
+ }
+
+ case ex => fail(s"This should be a GrpcServiceException but it is
${ex.getClass}")
+ }
+
+ }
+
+ "work with the native api on a stream response" in {
+ val richErrorResponseStream = client.itKeepsReplying(HelloRequest("Bob"))
+ val richErrorResponse =
+ richErrorResponseStream.run().failed.futureValue
+
+ richErrorResponse match {
+ case status: GrpcServiceException =>
+ status.metadata match {
+ case metadata: MetadataStatus =>
+ metadata.details(0).typeUrl should
be("type.googleapis.com/google.rpc.LocalizedMessage")
+
+ val localizedMessage =
metadata.getParsedDetails[LocalizedMessage].head
+
+ metadata.code should be(3)
+ metadata.message should be("What is wrong?")
+ localizedMessage.message should be("The password!")
+ localizedMessage.locale should be("EN")
+
+ case other => fail(s"This should be a MetadataStatus but it is
${other.getClass}")
+ }
+
+ case ex => fail(s"This should be a GrpcServiceException but it is
${ex.getClass}")
+ }
+
+ }
+
+ "work with the native api on a bidi stream" in {
+
+ val requests = List("Alice", "Bob", "Peter").map(HelloRequest(_))
+ val richErrorResponseStream = client.streamHellos(Source(requests))
+ val richErrorResponse =
+ richErrorResponseStream.run().failed.futureValue
+
+ richErrorResponse match {
+ case status: GrpcServiceException =>
+ status.metadata match {
+ case metadata: MetadataStatus =>
+ metadata.details(0).typeUrl should
be("type.googleapis.com/google.rpc.LocalizedMessage")
+
+ val localizedMessage =
metadata.getParsedDetails[LocalizedMessage].head
+
+ metadata.code should be(3)
+ metadata.message should be("What is wrong?")
+ localizedMessage.message should be("The password!")
+ localizedMessage.locale should be("EN")
+
+ case other => fail(s"This should be a MetadataStatus but it is
${other.getClass}")
+ }
+
+ case ex => fail(s"This should be a GrpcServiceException but it is
${ex.getClass}")
+ }
+
+ }
+
+ }
+
+ override def afterAll(): Unit = system.terminate().futureValue
+}
diff --git
a/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java
b/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java
index bac878bc..d3d56c8f 100644
---
a/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java
+++
b/plugin-tester-java/src/main/java/example/myapp/helloworld/LiftedGreeterClient.java
@@ -61,6 +61,7 @@ class LiftedGreeterClient {
CompletionStage<HelloReply> reply = client.sayHello().addHeader("key",
"value").invoke(request);
System.out.println("got single reply: " +
reply.toCompletableFuture().get(5, TimeUnit.SECONDS));
}
+
// #with-metadata
private static void streamingRequest(GreeterServiceClient client) throws
Exception {
diff --git
a/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java
b/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java
index 49dff12a..c3c5752a 100644
---
a/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java
+++
b/plugin-tester-java/src/main/java/example/myapp/helloworld/LoggingErrorHandlingGreeterServer.java
@@ -73,6 +73,7 @@ public class LoggingErrorHandlingGreeterServer {
}
}
}
+
// #implementation
// #method
@@ -121,6 +122,7 @@ public class LoggingErrorHandlingGreeterServer {
}
}));
}
+
// #method
// #custom-error-mapping
@@ -132,6 +134,7 @@ public class LoggingErrorHandlingGreeterServer {
return null;
}
};
+
// #custom-error-mapping
public static CompletionStage<ServerBinding> run(ActorSystem sys) throws
Exception {
diff --git
a/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala
b/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala
index f0440a2b..191c4c62 100644
--- a/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala
+++ b/runtime/src/main/scala/org/apache/pekko/grpc/GrpcServiceException.scala
@@ -17,12 +17,26 @@ import io.grpc.{ Status, StatusRuntimeException }
import org.apache.pekko
import pekko.annotation.ApiMayChange
import pekko.grpc.scaladsl.{ Metadata, MetadataBuilder }
-import pekko.grpc.internal.{ GrpcMetadataImpl, JavaMetadataImpl }
+import pekko.grpc.internal.{ GrpcMetadataImpl, JavaMetadataImpl,
RichGrpcMetadataImpl }
import com.google.protobuf.any.Any
import io.grpc.protobuf.StatusProto
+import scala.jdk.CollectionConverters._
object GrpcServiceException {
+ /**
+ * Java API
+ */
+ def create(
+ code: com.google.rpc.Code,
+ message: String,
+ details: java.util.List[scalapb.GeneratedMessage]): GrpcServiceException
= {
+ apply(code, message, details.asScala.toVector)
+ }
+
+ /**
+ * Scala API
+ */
def apply(
code: com.google.rpc.Code,
message: String,
@@ -45,7 +59,7 @@ object GrpcServiceException {
}
def apply(ex: StatusRuntimeException): GrpcServiceException = {
- new GrpcServiceException(ex.getStatus, new
GrpcMetadataImpl(ex.getTrailers))
+ new GrpcServiceException(ex.getStatus, new
RichGrpcMetadataImpl(ex.getStatus, ex.getTrailers))
}
}
diff --git
a/runtime/src/main/scala/org/apache/pekko/grpc/internal/MetadataImpl.scala
b/runtime/src/main/scala/org/apache/pekko/grpc/internal/MetadataImpl.scala
index a814069f..9fbd1c9b 100644
--- a/runtime/src/main/scala/org/apache/pekko/grpc/internal/MetadataImpl.scala
+++ b/runtime/src/main/scala/org/apache/pekko/grpc/internal/MetadataImpl.scala
@@ -25,7 +25,10 @@ import pekko.http.scaladsl.model.HttpHeader
import pekko.japi.Pair
import pekko.util.ByteString
import pekko.grpc.javadsl
-import pekko.grpc.scaladsl.{ BytesEntry, Metadata, MetadataEntry, StringEntry }
+import pekko.grpc.scaladsl.{ BytesEntry, Metadata, MetadataEntry,
MetadataStatus, StringEntry }
+import com.google.protobuf.any
+import com.google.rpc.Status
+import scalapb.{ GeneratedMessage, GeneratedMessageCompanion }
@InternalApi private[pekko] object MetadataImpl {
val BINARY_SUFFIX: String = io.grpc.Metadata.BINARY_HEADER_SUFFIX
@@ -196,7 +199,7 @@ class HeaderMetadataImpl(headers: immutable.Seq[HttpHeader]
= immutable.Seq.empt
* @param delegate The underlying Scala metadata instance.
*/
@InternalApi
-class JavaMetadataImpl(delegate: Metadata) extends javadsl.Metadata {
+class JavaMetadataImpl(val delegate: Metadata) extends javadsl.Metadata with
javadsl.MetadataStatus {
override def getText(key: String): Optional[String] =
delegate.getText(key).toJava
@@ -214,4 +217,46 @@ class JavaMetadataImpl(delegate: Metadata) extends
javadsl.Metadata {
override def toString: String =
delegate.toString
+
+ private def richDelegate =
+ delegate match {
+ case r: MetadataStatus => r
+ case other => throw new IllegalArgumentException(s"Delegate
metadata is not MetadataStatus but ${other.getClass}")
+ }
+
+ override def getStatus(): Status = richDelegate.status
+
+ override def getCode(): Int = richDelegate.code
+
+ override def getMessage(): String = richDelegate.message
+
+ private lazy val javaDetails: jList[com.google.protobuf.any.Any] =
richDelegate.details.asJava
+ def getDetails(): jList[com.google.protobuf.any.Any] = javaDetails
+
+ def getParsedDetails[K <: GeneratedMessage](companion:
GeneratedMessageCompanion[K]): jList[K] =
+ richDelegate.getParsedDetails(companion).asJava
+}
+
+class RichGrpcMetadataImpl(delegate: io.grpc.Status, meta: io.grpc.Metadata)
+ extends GrpcMetadataImpl(meta)
+ with MetadataStatus {
+ override val raw: Option[io.grpc.Metadata] = Some(meta)
+ override lazy val status: com.google.rpc.Status =
+ io.grpc.protobuf.StatusProto.fromStatusAndTrailers(delegate, meta)
+
+ override def code: Int = status.getCode
+ override def message: String = status.getMessage
+
+ override lazy val details: Seq[any.Any] = status.getDetailsList.asScala.map
{ item =>
+ fromJavaProto(item)
+ }.toVector
+
+ def getParsedDetails[K <: scalapb.GeneratedMessage](
+ implicit companion: scalapb.GeneratedMessageCompanion[K]): Seq[K] = {
+ val typeUrl = "type.googleapis.com/" + companion.scalaDescriptor.fullName
+ details.filter(_.typeUrl == typeUrl).map(_.unpack)
+ }
+
+ private def fromJavaProto(javaPbSource: com.google.protobuf.Any):
com.google.protobuf.any.Any =
+ com.google.protobuf.any.Any(typeUrl = javaPbSource.getTypeUrl, value =
javaPbSource.getValue)
}
diff --git
a/runtime/src/main/scala/org/apache/pekko/grpc/internal/RequestBuilderImpl.scala
b/runtime/src/main/scala/org/apache/pekko/grpc/internal/RequestBuilderImpl.scala
index 6d3205d3..648c2021 100644
---
a/runtime/src/main/scala/org/apache/pekko/grpc/internal/RequestBuilderImpl.scala
+++
b/runtime/src/main/scala/org/apache/pekko/grpc/internal/RequestBuilderImpl.scala
@@ -18,8 +18,8 @@ import java.util.concurrent.CompletionStage
import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.{ InternalApi, InternalStableApi }
-import pekko.grpc.{ GrpcClientSettings, GrpcResponseMetadata,
GrpcSingleResponse }
-import pekko.stream.Materializer
+import pekko.grpc.{ GrpcClientSettings, GrpcResponseMetadata,
GrpcServiceException, GrpcSingleResponse }
+import pekko.stream.{ Graph, Materializer, SourceShape }
import pekko.stream.javadsl.{ Source => JavaSource }
import pekko.stream.scaladsl.{ Keep, Sink, Source }
import pekko.util.ByteString
@@ -52,10 +52,12 @@ final class ScalaUnaryRequestBuilder[I, O](
NettyClientUtils.callOptionsWithDeadline(defaultOptions, settings)
override def invoke(request: I): Future[O] =
- channel.invoke(request, headers, descriptor, defaultOptions)
+ channel.invoke(request, headers, descriptor,
defaultOptions).recoverWith(RequestBuilderImpl.richError)
override def invokeWithMetadata(request: I): Future[GrpcSingleResponse[O]] =
- channel.invokeWithMetadata(request, headers, descriptor,
callOptionsWithDeadline())
+ channel
+ .invokeWithMetadata(request, headers, descriptor,
callOptionsWithDeadline())
+ .recoverWith(RequestBuilderImpl.richError)
override def withHeaders(headers: MetadataImpl): ScalaUnaryRequestBuilder[I,
O] =
new ScalaUnaryRequestBuilder[I, O](descriptor, channel, defaultOptions,
settings, headers)
@@ -118,7 +120,7 @@ final class ScalaClientStreamingRequestBuilder[I, O](
NettyClientUtils.callOptionsWithDeadline(defaultOptions, settings)
override def invoke(request: Source[I, NotUsed]): Future[O] =
- invokeWithMetadata(request).map(_.value)(ExecutionContext.parasitic)
+
invokeWithMetadata(request).map(_.value)(ExecutionContext.parasitic).recoverWith(RequestBuilderImpl.richError)
override def invokeWithMetadata(source: Source[I, NotUsed]):
Future[GrpcSingleResponse[O]] = {
// a bit much overhead here because we are using the flow to represent a
single response
@@ -144,6 +146,7 @@ final class ScalaClientStreamingRequestBuilder[I, O](
def getTrailers() = metadata.getTrailers()
}
}(ExecutionContext.parasitic)
+ .recoverWith(RequestBuilderImpl.richError)
}
override def withHeaders(headers: MetadataImpl):
ScalaClientStreamingRequestBuilder[I, O] =
@@ -207,10 +210,14 @@ final class ScalaServerStreamingRequestBuilder[I, O](
NettyClientUtils.callOptionsWithDeadline(defaultOptions, settings)
override def invoke(request: I): Source[O, NotUsed] =
- invokeWithMetadata(request).mapMaterializedValue(_ => NotUsed)
+ invokeWithMetadata(request)
+ .mapMaterializedValue(_ => NotUsed)
+ .recoverWithRetries(1, RequestBuilderImpl.richErrorStream)
override def invokeWithMetadata(request: I): Source[O,
Future[GrpcResponseMetadata]] =
- channel.invokeWithMetadata(Source.single(request), headers, descriptor,
true, callOptionsWithDeadline())
+ channel
+ .invokeWithMetadata(Source.single(request), headers, descriptor, true,
callOptionsWithDeadline())
+ .recoverWithRetries(1, RequestBuilderImpl.richErrorStream)
override def withHeaders(headers: MetadataImpl):
ScalaServerStreamingRequestBuilder[I, O] =
new ScalaServerStreamingRequestBuilder[I, O](descriptor, channel,
defaultOptions, settings, headers)
@@ -274,10 +281,14 @@ final class ScalaBidirectionalStreamingRequestBuilder[I,
O](
NettyClientUtils.callOptionsWithDeadline(defaultOptions, settings)
override def invoke(request: Source[I, NotUsed]): Source[O, NotUsed] =
- invokeWithMetadata(request).mapMaterializedValue(_ => NotUsed)
+ invokeWithMetadata(request)
+ .mapMaterializedValue(_ => NotUsed)
+ .recoverWithRetries(1, RequestBuilderImpl.richErrorStream)
override def invokeWithMetadata(source: Source[I, NotUsed]): Source[O,
Future[GrpcResponseMetadata]] =
- channel.invokeWithMetadata(source, headers, descriptor, true,
callOptionsWithDeadline())
+ channel
+ .invokeWithMetadata(source, headers, descriptor, true,
callOptionsWithDeadline())
+ .recoverWithRetries(1, RequestBuilderImpl.richErrorStream)
override def withHeaders(headers: MetadataImpl):
ScalaBidirectionalStreamingRequestBuilder[I, O] =
new ScalaBidirectionalStreamingRequestBuilder[I, O](descriptor, channel,
defaultOptions, settings, headers)
@@ -331,3 +342,18 @@ trait MetadataOperations[T <: MetadataOperations[T]] {
def addHeader(key: String, value: ByteString): T =
withHeaders(headers = headers.addEntry(key, value))
}
+
+object RequestBuilderImpl {
+ def richErrorStream[U]: PartialFunction[Throwable, Graph[SourceShape[U],
NotUsed]] = {
+ case item => Source.failed(RequestBuilderImpl.lift(item))
+ }
+
+ def richError[U]: PartialFunction[Throwable, Future[U]] = {
+ case item => Future.failed(RequestBuilderImpl.lift(item))
+ }
+
+ def lift(item: Throwable): scala.Throwable = item match {
+ case ex: StatusRuntimeException => GrpcServiceException(ex)
+ case other => other
+ }
+}
diff --git
a/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala
b/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala
index 4dc69c83..5a455bcc 100644
--- a/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala
+++ b/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/Metadata.scala
@@ -59,3 +59,19 @@ trait Metadata {
*/
def asScala: scaladsl.Metadata
}
+
+/**
+ * Provides access to richer error details using the logical gRPC
com.google.rpc.Status message, see
+ * [API Design Guide](https://cloud.google.com/apis/design/errors) for more
details.
+ *
+ * Not for user extension
+ */
+@ApiMayChange
+@DoNotInherit
+trait MetadataStatus extends Metadata {
+ def getStatus(): com.google.rpc.Status
+ def getCode(): Int
+ def getMessage(): String
+ def getDetails(): List[com.google.protobuf.any.Any]
+ def getParsedDetails[K <: scalapb.GeneratedMessage](companion:
scalapb.GeneratedMessageCompanion[K]): List[K]
+}
diff --git
a/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala
b/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala
index d8ce6056..e566a789 100644
--- a/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala
+++ b/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/Metadata.scala
@@ -16,6 +16,7 @@ package org.apache.pekko.grpc.scaladsl
import org.apache.pekko
import pekko.annotation.{ ApiMayChange, DoNotInherit, InternalApi }
import pekko.util.ByteString
+import com.google.protobuf.any
/**
* Immutable representation of the metadata in a call
@@ -55,3 +56,19 @@ import pekko.util.ByteString
@ApiMayChange
def asList: List[(String, MetadataEntry)]
}
+
+/**
+ * Provides access to richer error details using the logical gRPC
com.google.rpc.Status message, see
+ * [API Design Guide](https://cloud.google.com/apis/design/errors) for more
details.
+ *
+ * Not for user extension
+ */
+@ApiMayChange
+@DoNotInherit
+trait MetadataStatus extends Metadata {
+ def status: com.google.rpc.Status
+ def code: Int
+ def message: String
+ def details: Seq[any.Any]
+ def getParsedDetails[K <: scalapb.GeneratedMessage](implicit msg:
scalapb.GeneratedMessageCompanion[K]): Seq[K]
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]