This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 62bf7cbc7d =remote Make ues of Netty's default resolver.
62bf7cbc7d is described below
commit 62bf7cbc7dd43b2540d211eee2accf7d9c4de2e2
Author: He-Pin <[email protected]>
AuthorDate: Sat Sep 16 04:06:46 2023 +0800
=remote Make ues of Netty's default resolver.
---
.../remote/transport/netty/NettyTransport.scala | 40 +++++++++++++---------
1 file changed, 24 insertions(+), 16 deletions(-)
diff --git
a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
index c331bd5938..d7ff5dca8f 100644
---
a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
+++
b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
@@ -463,22 +463,24 @@ class NettyTransport(val settings:
NettyTransportSettings, val system: ExtendedA
override def isResponsibleFor(address: Address): Boolean = true // TODO: Add
configurable subnet filtering
// TODO: This should be factored out to an async (or thread-isolated) name
lookup service #2960
+ // Keep this for binary compatibility reasons
def addressToSocketAddress(addr: Address): Future[InetSocketAddress] = addr
match {
case Address(_, _, Some(host), Some(port)) =>
- Future { blocking { new InetSocketAddress(InetAddress.getByName(host),
port) } }
- case _ => Future.failed(new IllegalArgumentException(s"Address [$addr]
does not contain host or port information."))
+ Future {
+ blocking {
+ new InetSocketAddress(InetAddress.getByName(host), port)
+ }
+ }
+ case _ =>
+ Future.failed(new IllegalArgumentException(s"Address [$addr] must
contain both host and port information."))
}
override def listen: Future[(Address, Promise[AssociationEventListener])] = {
@nowarn("msg=deprecated")
val bindPort = settings.BindPortSelector
-
- for {
- address <- addressToSocketAddress(Address("", "", settings.BindHostname,
bindPort))
- } yield {
+ Future.fromTry(Try {
try {
- val newServerChannel = inboundBootstrap.bind(address).sync().channel()
-
+ val newServerChannel = inboundBootstrap.bind(settings.BindHostname,
bindPort).sync().channel()
// Block reads until a handler actor is registered
newServerChannel.config().setAutoRead(false)
channelGroup.add(newServerChannel)
@@ -511,28 +513,34 @@ class NettyTransport(val settings:
NettyTransportSettings, val system: ExtendedA
s"Unknown local address type
[${newServerChannel.localAddress().getClass.getName}]")
}
} catch {
- case NonFatal(e) => {
- log.error("failed to bind to {}, shutting down Netty transport",
address)
+ case NonFatal(e) =>
+ log.error("failed to bind to host:{} port:{}, shutting down Netty
transport", settings.BindHostname, bindPort)
try {
shutdown()
- } catch { case NonFatal(_) => } // ignore possible exception during
shutdown
- throw e
- }
+ } catch {
+ case NonFatal(_) =>
+ } // ignore possible exception during shutdown
+ throw e;
}
- }
+ })
}
// Need to do like this for binary compatibility reasons
private[pekko] def boundAddress = boundTo
+ private def extractHostAndPort(addr: Address): (String, Int) = addr match {
+ case Address(_, _, Some(host), Some(port)) => (host, port)
+ case _ => throw new
IllegalArgumentException(s"Address [$addr] must contain both host and port
information.")
+ }
+
override def associate(remoteAddress: Address): Future[AssociationHandle] = {
if (!serverChannel.isActive) Future.failed(new
NettyTransportException("Transport is not bound"))
else {
val bootstrap: ClientBootstrap = outboundBootstrap(remoteAddress)
(for {
- socketAddress <- addressToSocketAddress(remoteAddress)
- readyChannel <-
NettyFutureBridge(bootstrap.connect(socketAddress)).map { channel =>
+ (host, port) <- Future.fromTry(Try(extractHostAndPort(remoteAddress)))
+ readyChannel <- NettyFutureBridge(bootstrap.connect(host, port)).map {
channel =>
if (EnableSsl)
blocking {
channel.pipeline().get(classOf[SslHandler]).handshakeFuture().awaitUninterruptibly()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]