This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new dbc9ed3a99 chore: Add support for controlling the NettyTransport's 
byteBuf allocator type. (#1707)
dbc9ed3a99 is described below

commit dbc9ed3a99b0d97a697b705ceb927c48d72eea90
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Tue Jan 14 18:41:35 2025 +0800

    chore: Add support for controlling the NettyTransport's byteBuf allocator 
type. (#1707)
    
    * chore: Add support for controlling the NettyTransport's byteBuf allocator 
type.
    
    * chore: extract deriveByteBufAllocator method
---
 remote/src/main/resources/reference.conf           |  8 ++++
 .../remote/transport/netty/NettyTransport.scala    | 26 ++++++++++++-
 .../org/apache/pekko/remote/RemoteConfigSpec.scala |  5 ++-
 .../remote/artery/BindCanonicalAddressSpec.scala   |  2 +-
 .../netty => transport}/NettyTransportSpec.scala   | 44 +++++++++++++++++-----
 5 files changed, 73 insertions(+), 12 deletions(-)

diff --git a/remote/src/main/resources/reference.conf 
b/remote/src/main/resources/reference.conf
index 4c4edff858..4dfb6fedbf 100644
--- a/remote/src/main/resources/reference.conf
+++ b/remote/src/main/resources/reference.conf
@@ -621,6 +621,14 @@ pekko {
         # "off-for-windows" of course means that it's "on" for all other 
platforms
         tcp-reuse-addr = off-for-windows
 
+        # Used to control the Netty 4's ByteBufAllocator. The default is 
"pooled".
+        # pooled          : use a PooledByteBufAllocator.DEFAULT
+        # unpooled        : use an UnpooledByteBufAllocator.DEFAULT
+        # unpooled-heap   : use an UnpooledByteBufAllocator with prefer direct 
`false`
+        # adaptive        : use an AdaptiveByteBufAllocator
+        # adaptive-heap   : use an AdaptiveByteBufAllocator with prefer direct 
`false`
+        bytebuf-allocator-type = "pooled"
+
         # Used to configure the number of I/O worker threads on server sockets
         server-socket-worker-pool {
           # Min number of threads to cap factor-based number to
diff --git 
a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
 
b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
index 0956548fdf..4016c99bcc 100644
--- 
a/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
+++ 
b/remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala
@@ -24,6 +24,7 @@ import scala.util.Try
 import scala.util.control.{ NoStackTrace, NonFatal }
 
 import com.typesafe.config.Config
+
 import org.apache.pekko
 import pekko.ConfigurationException
 import pekko.OnlyCauseStackTrace
@@ -38,7 +39,13 @@ import pekko.util.Helpers.Requiring
 import pekko.util.{ Helpers, OptionVal }
 
 import io.netty.bootstrap.{ Bootstrap => ClientBootstrap, ServerBootstrap }
-import io.netty.buffer.Unpooled
+import io.netty.buffer.{
+  AdaptiveByteBufAllocator,
+  ByteBufAllocator,
+  PooledByteBufAllocator,
+  Unpooled,
+  UnpooledByteBufAllocator
+}
 import io.netty.channel.{
   Channel,
   ChannelFuture,
@@ -160,6 +167,8 @@ class NettyTransportSettings(config: Config) {
     case _                 => getBoolean("tcp-reuse-addr")
   }
 
+  val ByteBufAllocator: ByteBufAllocator = 
NettyTransport.deriveByteBufAllocator(getString("bytebuf-allocator-type"))
+
   val Hostname: String = getString("hostname") match {
     case ""    => InetAddress.getLocalHost.getHostAddress
     case value => value
@@ -318,6 +327,17 @@ private[transport] object NettyTransport {
       systemName: String,
       hostName: Option[String]): Option[Address] =
     addressFromSocketAddress(addr, schemeIdentifier, systemName, hostName, 
port = None)
+
+  def deriveByteBufAllocator(allocatorType: String): ByteBufAllocator = 
allocatorType match {
+    case "pooled"        => PooledByteBufAllocator.DEFAULT
+    case "unpooled"      => UnpooledByteBufAllocator.DEFAULT
+    case "unpooled-heap" => new UnpooledByteBufAllocator(false)
+    case "adaptive"      => new AdaptiveByteBufAllocator()
+    case "adaptive-heap" => new AdaptiveByteBufAllocator(false)
+    case other => throw new IllegalArgumentException(
+        "Unknown 'bytebuf-allocator-type' [" + other + "]," +
+        " supported values are 'pooled', 'unpooled', 'unpooled-heap', 
'adaptive', 'adaptive-heap'.")
+  }
 }
 
 @deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0")
@@ -442,6 +462,10 @@ class NettyTransport(val settings: NettyTransportSettings, 
val system: ExtendedA
     bootstrap.childOption[java.lang.Boolean](ChannelOption.TCP_NODELAY, 
settings.TcpNodelay)
     bootstrap.childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, 
settings.TcpKeepalive)
 
+    // Use the same allocator for inbound and outbound buffers
+    bootstrap.option(ChannelOption.ALLOCATOR, settings.ByteBufAllocator)
+    bootstrap.childOption(ChannelOption.ALLOCATOR, settings.ByteBufAllocator)
+
     settings.ReceiveBufferSize.foreach(sz => 
bootstrap.childOption[java.lang.Integer](ChannelOption.SO_RCVBUF, sz))
     settings.SendBufferSize.foreach(sz => 
bootstrap.childOption[java.lang.Integer](ChannelOption.SO_SNDBUF, sz))
     settings.WriteBufferHighWaterMark.filter(_ > 0).foreach(sz =>
diff --git 
a/remote/src/test/scala/org/apache/pekko/remote/RemoteConfigSpec.scala 
b/remote/src/test/scala/org/apache/pekko/remote/RemoteConfigSpec.scala
index cff4fad3af..62434d809e 100644
--- a/remote/src/test/scala/org/apache/pekko/remote/RemoteConfigSpec.scala
+++ b/remote/src/test/scala/org/apache/pekko/remote/RemoteConfigSpec.scala
@@ -13,9 +13,11 @@
 
 package org.apache.pekko.remote
 
-import scala.concurrent.duration._
+import io.netty.buffer.PooledByteBufAllocator
 
+import scala.concurrent.duration._
 import scala.annotation.nowarn
+
 import language.postfixOps
 
 import org.apache.pekko
@@ -103,6 +105,7 @@ class RemoteConfigSpec extends PekkoSpec("""
       TcpNodelay should ===(true)
       TcpKeepalive should ===(true)
       TcpReuseAddr should ===(!Helpers.isWindows)
+      ByteBufAllocator should ===(PooledByteBufAllocator.DEFAULT)
       c.getString("hostname") should ===("")
       c.getString("bind-hostname") should ===("")
       c.getString("bind-port") should ===("")
diff --git 
a/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala
 
b/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala
index 5f44b95652..58a6c222b7 100644
--- 
a/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala
+++ 
b/remote/src/test/scala/org/apache/pekko/remote/artery/BindCanonicalAddressSpec.scala
@@ -24,7 +24,7 @@ import org.scalatest.wordspec.AnyWordSpec
 
 import org.apache.pekko
 import pekko.actor.{ ActorSystem, Address }
-import pekko.remote.classic.transport.netty.NettyTransportSpec._
+import pekko.remote.transport.NettyTransportSpec._
 import pekko.testkit.SocketUtil
 
 trait BindCanonicalAddressBehaviors {
diff --git 
a/remote/src/test/scala/org/apache/pekko/remote/classic/transport/netty/NettyTransportSpec.scala
 
b/remote/src/test/scala/org/apache/pekko/remote/transport/NettyTransportSpec.scala
similarity index 85%
rename from 
remote/src/test/scala/org/apache/pekko/remote/classic/transport/netty/NettyTransportSpec.scala
rename to 
remote/src/test/scala/org/apache/pekko/remote/transport/NettyTransportSpec.scala
index 3f84936b8d..436994de58 100644
--- 
a/remote/src/test/scala/org/apache/pekko/remote/classic/transport/netty/NettyTransportSpec.scala
+++ 
b/remote/src/test/scala/org/apache/pekko/remote/transport/NettyTransportSpec.scala
@@ -11,23 +11,25 @@
  * Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
  */
 
-package org.apache.pekko.remote.classic.transport.netty
-
-import java.net.{ InetAddress, InetSocketAddress }
-import java.nio.channels.ServerSocketChannel
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
+package org.apache.pekko.remote.transport
 
 import com.typesafe.config.ConfigFactory
-import org.scalatest.matchers.should.Matchers
-import org.scalatest.wordspec.AnyWordSpec
+import io.netty.buffer.{ AdaptiveByteBufAllocator, PooledByteBufAllocator, 
UnpooledByteBufAllocator }
 
 import org.apache.pekko
 import pekko.actor.{ ActorSystem, Address, ExtendedActorSystem }
 import pekko.remote.BoundAddressesExtension
+import pekko.remote.transport.netty.NettyTransport.deriveByteBufAllocator
 import pekko.testkit.SocketUtil
 
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpec
+
+import java.net.{ InetAddress, InetSocketAddress }
+import java.nio.channels.ServerSocketChannel
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 object NettyTransportSpec {
   val commonConfig = ConfigFactory.parseString("""
     pekko.actor.provider = remote
@@ -132,6 +134,30 @@ class NettyTransportSpec extends AnyWordSpec with Matchers 
with BindBehavior {
 
       Await.result(sys.terminate(), Duration.Inf)
     }
+
+    "be able to specify byte buffer allocator" in {
+      deriveByteBufAllocator("pooled") should 
===(PooledByteBufAllocator.DEFAULT)
+      deriveByteBufAllocator("unpooled") should 
===(UnpooledByteBufAllocator.DEFAULT)
+
+      {
+        val allocator = deriveByteBufAllocator("unpooled-heap")
+        allocator shouldBe a[UnpooledByteBufAllocator]
+        allocator.toString.contains("directByDefault: false") should ===(true)
+      }
+
+      {
+        val allocator = deriveByteBufAllocator("adaptive")
+        allocator shouldBe a[AdaptiveByteBufAllocator]
+        allocator.toString.contains("directByDefault: true") should ===(true)
+      }
+
+      {
+        val allocator = deriveByteBufAllocator("adaptive-heap")
+        allocator shouldBe a[AdaptiveByteBufAllocator]
+        allocator.toString.contains("directByDefault: false") should ===(true)
+      }
+
+    }
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to