This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new dbc9ed3a99 chore: Add support for controlling the NettyTransport's
byteBuf allocator type. (#1707)
dbc9ed3a99 is described below
commit dbc9ed3a99b0d97a697b705ceb927c48d72eea90
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Tue Jan 14 18:41:35 2025 +0800
chore: Add support for controlling the NettyTransport's byteBuf allocator
type. (#1707)
* chore: Add support for controlling the NettyTransport's byteBuf allocator
type.
* chore: extract deriveByteBufAllocator method
---
remote/src/main/resources/reference.conf | 8 ++++
.../remote/transport/netty/NettyTransport.scala | 26 ++++++++++++-
.../org/apache/pekko/remote/RemoteConfigSpec.scala | 5 ++-
.../remote/artery/BindCanonicalAddressSpec.scala | 2 +-
.../netty => transport}/NettyTransportSpec.scala | 44 +++++++++++++++++-----
5 files changed, 73 insertions(+), 12 deletions(-)
diff --git a/remote/src/main/resources/reference.conf
b/remote/src/main/resources/reference.conf
index 4c4edff858..4dfb6fedbf 100644
--- a/remote/src/main/resources/reference.conf
+++ b/remote/src/main/resources/reference.conf
@@ -621,6 +621,14 @@ pekko {
# "off-for-windows" of course means that it's "on" for all other
platforms
tcp-reuse-addr = off-for-windows
+ # Used to control the Netty 4's ByteBufAllocator. The default is
"pooled".
+ # pooled : use a PooledByteBufAllocator.DEFAULT
+ # unpooled : use an UnpooledByteBufAllocator.DEFAULT
+ # unpooled-heap : use an UnpooledByteBufAllocator with prefer direct
`false`
+ # adaptive : use an AdaptiveByteBufAllocator
+ # adaptive-heap : use an AdaptiveByteBufAllocator with prefer direct
`false`
+ bytebuf-allocator-type = "pooled"
+
# Used to configure the number of I/O worker threads on server sockets
server-socket-worker-pool {
# Min number of threads to cap factor-based number to
diff --git
a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
index 0956548fdf..4016c99bcc 100644
---
a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
+++
b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
@@ -24,6 +24,7 @@ import scala.util.Try
import scala.util.control.{ NoStackTrace, NonFatal }
import com.typesafe.config.Config
+
import org.apache.pekko
import pekko.ConfigurationException
import pekko.OnlyCauseStackTrace
@@ -38,7 +39,13 @@ import pekko.util.Helpers.Requiring
import pekko.util.{ Helpers, OptionVal }
import io.netty.bootstrap.{ Bootstrap => ClientBootstrap, ServerBootstrap }
-import io.netty.buffer.Unpooled
+import io.netty.buffer.{
+ AdaptiveByteBufAllocator,
+ ByteBufAllocator,
+ PooledByteBufAllocator,
+ Unpooled,
+ UnpooledByteBufAllocator
+}
import io.netty.channel.{
Channel,
ChannelFuture,
@@ -160,6 +167,8 @@ class NettyTransportSettings(config: Config) {
case _ => getBoolean("tcp-reuse-addr")
}
+ val ByteBufAllocator: ByteBufAllocator =
NettyTransport.deriveByteBufAllocator(getString("bytebuf-allocator-type"))
+
val Hostname: String = getString("hostname") match {
case "" => InetAddress.getLocalHost.getHostAddress
case value => value
@@ -318,6 +327,17 @@ private[transport] object NettyTransport {
systemName: String,
hostName: Option[String]): Option[Address] =
addressFromSocketAddress(addr, schemeIdentifier, systemName, hostName,
port = None)
+
+ def deriveByteBufAllocator(allocatorType: String): ByteBufAllocator =
allocatorType match {
+ case "pooled" => PooledByteBufAllocator.DEFAULT
+ case "unpooled" => UnpooledByteBufAllocator.DEFAULT
+ case "unpooled-heap" => new UnpooledByteBufAllocator(false)
+ case "adaptive" => new AdaptiveByteBufAllocator()
+ case "adaptive-heap" => new AdaptiveByteBufAllocator(false)
+ case other => throw new IllegalArgumentException(
+ "Unknown 'bytebuf-allocator-type' [" + other + "]," +
+ " supported values are 'pooled', 'unpooled', 'unpooled-heap',
'adaptive', 'adaptive-heap'.")
+ }
}
@deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0")
@@ -442,6 +462,10 @@ class NettyTransport(val settings: NettyTransportSettings,
val system: ExtendedA
bootstrap.childOption[java.lang.Boolean](ChannelOption.TCP_NODELAY,
settings.TcpNodelay)
bootstrap.childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE,
settings.TcpKeepalive)
+ // Use the same allocator for inbound and outbound buffers
+ bootstrap.option(ChannelOption.ALLOCATOR, settings.ByteBufAllocator)
+ bootstrap.childOption(ChannelOption.ALLOCATOR, settings.ByteBufAllocator)
+
settings.ReceiveBufferSize.foreach(sz =>
bootstrap.childOption[java.lang.Integer](ChannelOption.SO_RCVBUF, sz))
settings.SendBufferSize.foreach(sz =>
bootstrap.childOption[java.lang.Integer](ChannelOption.SO_SNDBUF, sz))
settings.WriteBufferHighWaterMark.filter(_ > 0).foreach(sz =>
diff --git
a/remote/src/test/scala/org/apache/pekko/remote/RemoteConfigSpec.scala
b/remote/src/test/scala/org/apache/pekko/remote/RemoteConfigSpec.scala
index cff4fad3af..62434d809e 100644
--- a/remote/src/test/scala/org/apache/pekko/remote/RemoteConfigSpec.scala
+++ b/remote/src/test/scala/org/apache/pekko/remote/RemoteConfigSpec.scala
@@ -13,9 +13,11 @@
package org.apache.pekko.remote
-import scala.concurrent.duration._
+import io.netty.buffer.PooledByteBufAllocator
+import scala.concurrent.duration._
import scala.annotation.nowarn
+
import language.postfixOps
import org.apache.pekko
@@ -103,6 +105,7 @@ class RemoteConfigSpec extends PekkoSpec("""
TcpNodelay should ===(true)
TcpKeepalive should ===(true)
TcpReuseAddr should ===(!Helpers.isWindows)
+ ByteBufAllocator should ===(PooledByteBufAllocator.DEFAULT)
c.getString("hostname") should ===("")
c.getString("bind-hostname") should ===("")
c.getString("bind-port") should ===("")
diff --git
a/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala
b/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala
index 5f44b95652..58a6c222b7 100644
---
a/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala
+++
b/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala
@@ -24,7 +24,7 @@ import org.scalatest.wordspec.AnyWordSpec
import org.apache.pekko
import pekko.actor.{ ActorSystem, Address }
-import pekko.remote.classic.transport.netty.NettyTransportSpec._
+import pekko.remote.transport.NettyTransportSpec._
import pekko.testkit.SocketUtil
trait BindCanonicalAddressBehaviors {
diff --git
a/remote/src/test/scala/org/apache/pekko/remote/classic/transport/netty/NettyTransportSpec.scala
b/remote/src/test/scala/org/apache/pekko/remote/transport/NettyTransportSpec.scala
similarity index 85%
rename from
remote/src/test/scala/org/apache/pekko/remote/classic/transport/netty/NettyTransportSpec.scala
rename to
remote/src/test/scala/org/apache/pekko/remote/transport/NettyTransportSpec.scala
index 3f84936b8d..436994de58 100644
---
a/remote/src/test/scala/org/apache/pekko/remote/classic/transport/netty/NettyTransportSpec.scala
+++
b/remote/src/test/scala/org/apache/pekko/remote/transport/NettyTransportSpec.scala
@@ -11,23 +11,25 @@
* Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
*/
-package org.apache.pekko.remote.classic.transport.netty
-
-import java.net.{ InetAddress, InetSocketAddress }
-import java.nio.channels.ServerSocketChannel
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
+package org.apache.pekko.remote.transport
import com.typesafe.config.ConfigFactory
-import org.scalatest.matchers.should.Matchers
-import org.scalatest.wordspec.AnyWordSpec
+import io.netty.buffer.{ AdaptiveByteBufAllocator, PooledByteBufAllocator,
UnpooledByteBufAllocator }
import org.apache.pekko
import pekko.actor.{ ActorSystem, Address, ExtendedActorSystem }
import pekko.remote.BoundAddressesExtension
+import pekko.remote.transport.netty.NettyTransport.deriveByteBufAllocator
import pekko.testkit.SocketUtil
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+import java.net.{ InetAddress, InetSocketAddress }
+import java.nio.channels.ServerSocketChannel
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
object NettyTransportSpec {
val commonConfig = ConfigFactory.parseString("""
pekko.actor.provider = remote
@@ -132,6 +134,30 @@ class NettyTransportSpec extends AnyWordSpec with Matchers
with BindBehavior {
Await.result(sys.terminate(), Duration.Inf)
}
+
+ "be able to specify byte buffer allocator" in {
+ deriveByteBufAllocator("pooled") should
===(PooledByteBufAllocator.DEFAULT)
+ deriveByteBufAllocator("unpooled") should
===(UnpooledByteBufAllocator.DEFAULT)
+
+ {
+ val allocator = deriveByteBufAllocator("unpooled-heap")
+ allocator shouldBe a[UnpooledByteBufAllocator]
+ allocator.toString.contains("directByDefault: false") should ===(true)
+ }
+
+ {
+ val allocator = deriveByteBufAllocator("adaptive")
+ allocator shouldBe a[AdaptiveByteBufAllocator]
+ allocator.toString.contains("directByDefault: true") should ===(true)
+ }
+
+ {
+ val allocator = deriveByteBufAllocator("adaptive-heap")
+ allocator shouldBe a[AdaptiveByteBufAllocator]
+ allocator.toString.contains("directByDefault: false") should ===(true)
+ }
+
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]