This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 9288d4c6f1 =remote Drop the blocking usage.
9288d4c6f1 is described below

commit 9288d4c6f16dfd71cf4d5dd81912c673680e2053
Author: He-Pin <[email protected]>
AuthorDate: Sat Sep 16 14:03:33 2023 +0800

    =remote Drop the blocking usage.
---
 .../remote/transport/netty/NettyTransport.scala    | 23 ++++++++++++++--------
 1 file changed, 15 insertions(+), 8 deletions(-)

diff --git 
a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
 
b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
index d7ff5dca8f..f52d4540e3 100644
--- 
a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
+++ 
b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
@@ -68,6 +68,17 @@ object NettyFutureBridge {
     p.future
   }
 
+  private[transport] def apply[T](nettyFuture: 
io.netty.util.concurrent.Future[T]): Future[T] = {
+    val p = Promise[T]()
+    nettyFuture.addListener((future: io.netty.util.concurrent.Future[T]) =>
+      p.complete(
+        Try(
+          if (future.isSuccess) future.get()
+          else if (future.isCancelled) throw new CancellationException
+          else throw future.cause())))
+    p.future
+  }
+
   def apply(nettyFuture: ChannelGroupFuture): Future[ChannelGroup] = {
     import pekko.util.ccompat.JavaConverters._
     val p = Promise[ChannelGroup]()
@@ -540,14 +551,10 @@ class NettyTransport(val settings: 
NettyTransportSettings, val system: ExtendedA
 
       (for {
         (host, port) <- Future.fromTry(Try(extractHostAndPort(remoteAddress)))
-        readyChannel <- NettyFutureBridge(bootstrap.connect(host, port)).map { 
channel =>
-          if (EnableSsl)
-            blocking {
-              
channel.pipeline().get(classOf[SslHandler]).handshakeFuture().awaitUninterruptibly()
-            }
-          channel.config.setAutoRead(false)
-          channel
-        }
+        channel <- NettyFutureBridge(bootstrap.connect(host, port))
+        readyChannel <- if (EnableSsl) {
+          
NettyFutureBridge(channel.pipeline().get(classOf[SslHandler]).handshakeFuture())
+        } else Future.successful(channel)
         handle <- 
readyChannel.pipeline().get(classOf[ClientHandler]).statusFuture
       } yield handle).recover {
         case _: CancellationException => throw new 
NettyTransportExceptionNoStack("Connection was cancelled")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to