This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 9288d4c6f1 =remote Drop the blocking usage.
9288d4c6f1 is described below
commit 9288d4c6f16dfd71cf4d5dd81912c673680e2053
Author: He-Pin <[email protected]>
AuthorDate: Sat Sep 16 14:03:33 2023 +0800
=remote Drop the blocking usage.
---
.../remote/transport/netty/NettyTransport.scala | 23 ++++++++++++++--------
1 file changed, 15 insertions(+), 8 deletions(-)
diff --git
a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
index d7ff5dca8f..f52d4540e3 100644
---
a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
+++
b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
@@ -68,6 +68,17 @@ object NettyFutureBridge {
p.future
}
+ private[transport] def apply[T](nettyFuture:
io.netty.util.concurrent.Future[T]): Future[T] = {
+ val p = Promise[T]()
+ nettyFuture.addListener((future: io.netty.util.concurrent.Future[T]) =>
+ p.complete(
+ Try(
+ if (future.isSuccess) future.get()
+ else if (future.isCancelled) throw new CancellationException
+ else throw future.cause())))
+ p.future
+ }
+
def apply(nettyFuture: ChannelGroupFuture): Future[ChannelGroup] = {
import pekko.util.ccompat.JavaConverters._
val p = Promise[ChannelGroup]()
@@ -540,14 +551,10 @@ class NettyTransport(val settings:
NettyTransportSettings, val system: ExtendedA
(for {
(host, port) <- Future.fromTry(Try(extractHostAndPort(remoteAddress)))
- readyChannel <- NettyFutureBridge(bootstrap.connect(host, port)).map {
channel =>
- if (EnableSsl)
- blocking {
-
channel.pipeline().get(classOf[SslHandler]).handshakeFuture().awaitUninterruptibly()
- }
- channel.config.setAutoRead(false)
- channel
- }
+ channel <- NettyFutureBridge(bootstrap.connect(host, port))
+ readyChannel <- if (EnableSsl) {
+
NettyFutureBridge(channel.pipeline().get(classOf[SslHandler]).handshakeFuture())
+ } else Future.successful(channel)
handle <-
readyChannel.pipeline().get(classOf[ClientHandler]).statusFuture
} yield handle).recover {
case _: CancellationException => throw new
NettyTransportExceptionNoStack("Connection was cancelled")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]