This is an automated email from the ASF dual-hosted git repository.

He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 8116ff7c5e feat: support SO_REUSEPORT socket option (#2915)
8116ff7c5e is described below

commit 8116ff7c5e9fa0039c0cd67e5e6870e8ac2a632d
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Fri May 8 21:03:18 2026 +0800

    feat: support SO_REUSEPORT socket option (#2915)
    
    * feat: support SO_REUSEPORT socket option
    
    * Fix license header in InetSocketOptionSpec.scala
    
    The new test file incorrectly used the derived-from-Akka header with
    Lightbend copyright. Since this is a brand new file created for the
    SO_REUSEPORT feature, it should use the standard Apache-only license
    header like other new files in the project.
    
    🤖 Generated with [Qoder][https://qoder.com]
    
    * Address SO_REUSEPORT review feedback
---
 .../org/apache/pekko/io/InetSocketOptionSpec.scala | 82 ++++++++++++++++++++++
 .../org/apache/pekko/io/TcpIntegrationSpec.scala   | 18 ++++-
 .../org/apache/pekko/io/UdpIntegrationSpec.scala   | 14 ++++
 .../src/main/scala/org/apache/pekko/io/Inet.scala  | 39 ++++++++++
 .../apache/pekko/io/TcpOutgoingConnection.scala    | 10 +--
 .../scala/org/apache/pekko/io/UdpManager.scala     |  2 +-
 .../main/scala/org/apache/pekko/io/UdpSender.scala | 50 ++++++++-----
 7 files changed, 193 insertions(+), 22 deletions(-)

diff --git 
a/actor-tests/src/test/scala/org/apache/pekko/io/InetSocketOptionSpec.scala 
b/actor-tests/src/test/scala/org/apache/pekko/io/InetSocketOptionSpec.scala
new file mode 100644
index 0000000000..1825cfd1eb
--- /dev/null
+++ b/actor-tests/src/test/scala/org/apache/pekko/io/InetSocketOptionSpec.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.io
+
+import java.net.StandardSocketOptions
+import java.nio.channels.DatagramChannel
+import java.nio.channels.NetworkChannel
+import java.nio.channels.ServerSocketChannel
+import java.nio.channels.SocketChannel
+
+import org.apache.pekko
+import pekko.testkit.PekkoSpec
+
+class InetSocketOptionSpec extends PekkoSpec {
+
+  "Inet.SO.ReusePort" must {
+
+    "set SO_REUSEPORT on TCP server sockets before bind" in {
+      val channel = ServerSocketChannel.open()
+      try {
+        pendingIfReusePortUnsupported(channel)
+
+        Inet.SO.ReusePort(true).beforeServerSocketBind(channel.socket)
+
+        channel.getOption(StandardSocketOptions.SO_REUSEPORT).booleanValue 
should ===(true)
+      } finally {
+        channel.close()
+      }
+    }
+
+    "set SO_REUSEPORT on TCP client sockets before connect" in {
+      val channel = SocketChannel.open()
+      try {
+        pendingIfReusePortUnsupported(channel)
+
+        Tcp.SO.ReusePort(true).beforeConnect(channel.socket)
+
+        channel.getOption(StandardSocketOptions.SO_REUSEPORT).booleanValue 
should ===(true)
+      } finally {
+        channel.close()
+      }
+    }
+
+    "set SO_REUSEPORT on UDP sockets before bind" in {
+      val channel = DatagramChannel.open()
+      try {
+        pendingIfReusePortUnsupported(channel)
+
+        Udp.SO.ReusePort(true).beforeDatagramBind(channel.socket)
+
+        channel.getOption(StandardSocketOptions.SO_REUSEPORT).booleanValue 
should ===(true)
+      } finally {
+        channel.close()
+      }
+    }
+
+    "be available through the Java TCP and UDP socket option APIs" in {
+      TcpSO.reusePort(true) should ===(Inet.SO.ReusePort(true))
+      UdpSO.reusePort(false) should ===(Inet.SO.ReusePort(false))
+    }
+  }
+
+  private def pendingIfReusePortUnsupported(channel: NetworkChannel): Unit =
+    if 
(!channel.supportedOptions().contains(StandardSocketOptions.SO_REUSEPORT)) {
+      pending
+    }
+}
diff --git 
a/actor-tests/src/test/scala/org/apache/pekko/io/TcpIntegrationSpec.scala 
b/actor-tests/src/test/scala/org/apache/pekko/io/TcpIntegrationSpec.scala
index f7e3b857fe..e4de5a636e 100644
--- a/actor-tests/src/test/scala/org/apache/pekko/io/TcpIntegrationSpec.scala
+++ b/actor-tests/src/test/scala/org/apache/pekko/io/TcpIntegrationSpec.scala
@@ -14,7 +14,7 @@
 package org.apache.pekko.io
 
 import java.io.IOException
-import java.net.{ InetSocketAddress, ServerSocket }
+import java.net.{ InetSocketAddress, ServerSocket, Socket }
 
 import scala.concurrent.duration._
 
@@ -184,6 +184,22 @@ class TcpIntegrationSpec extends PekkoSpec("""
       replies should ===(Nil)
     }
 
+    "reply with CommandFailed when a connect socket option fails before 
connect" in {
+      val connectCommander = TestProbe()
+      val failure = new UnsupportedOperationException("boom")
+      val failingOption = new Inet.SocketOption {
+        override def beforeConnect(s: Socket): Unit = throw failure
+      }
+      val endpoint = new InetSocketAddress("127.0.0.1", 1)
+      val command = Connect(endpoint, options = List(failingOption))
+
+      connectCommander.send(IO(Tcp), command)
+
+      val commandFailed = connectCommander.expectMsgType[CommandFailed]
+      commandFailed.cmd should ===(command)
+      commandFailed.cause should ===(Some(failure))
+    }
+
     "handle tcp connection actor death properly" in new 
TestSetup(shouldBindServer = false) {
       val serverSocket = new ServerSocket(endpoint.getPort(), 100, 
endpoint.getAddress())
       val connectCommander = TestProbe()
diff --git 
a/actor-tests/src/test/scala/org/apache/pekko/io/UdpIntegrationSpec.scala 
b/actor-tests/src/test/scala/org/apache/pekko/io/UdpIntegrationSpec.scala
index 957b7828e8..9a20d7bd9a 100644
--- a/actor-tests/src/test/scala/org/apache/pekko/io/UdpIntegrationSpec.scala
+++ b/actor-tests/src/test/scala/org/apache/pekko/io/UdpIntegrationSpec.scala
@@ -129,6 +129,15 @@ class UdpIntegrationSpec extends PekkoSpec("""
       commander.expectMsgType[Bound]
       assert(assertOption.openCalled === 1)
     }
+
+    "reply with CommandFailed when a simple sender socket option fails before 
registration" in {
+      val commander = TestProbe()
+      val command = SimpleSender(options = List(FailBeforeDatagramBind()))
+
+      commander.send(IO(Udp), command)
+
+      commander.expectMsg(CommandFailed(command))
+    }
   }
 
 }
@@ -162,3 +171,8 @@ private case class AssertOpenDatagramChannel() extends 
DatagramChannelCreator {
     super.create()
   }
 }
+
+private case class FailBeforeDatagramBind() extends SocketOption {
+  override def beforeDatagramBind(ds: DatagramSocket): Unit =
+    throw new UnsupportedOperationException("boom")
+}
diff --git a/actor/src/main/scala/org/apache/pekko/io/Inet.scala 
b/actor/src/main/scala/org/apache/pekko/io/Inet.scala
index ccf28a98bd..26648029b1 100644
--- a/actor/src/main/scala/org/apache/pekko/io/Inet.scala
+++ b/actor/src/main/scala/org/apache/pekko/io/Inet.scala
@@ -16,12 +16,20 @@ package org.apache.pekko.io
 import java.net.DatagramSocket
 import java.net.ServerSocket
 import java.net.Socket
+import java.net.StandardSocketOptions
 import java.nio.channels.DatagramChannel
+import java.nio.channels.NetworkChannel
 
 import scala.annotation.nowarn
 
 object Inet {
 
+  private def setReusePort(channel: NetworkChannel, on: Boolean): Unit =
+    if (channel eq null)
+      throw new UnsupportedOperationException("SO_REUSEPORT can only be set on 
channel-backed sockets")
+    else
+      channel.setOption(StandardSocketOptions.SO_REUSEPORT, 
java.lang.Boolean.valueOf(on))
+
   /**
    * SocketOption is a package of data (from the user) and associated
    * behavior (how to apply that to a channel).
@@ -126,6 +134,19 @@ object Inet {
       override def beforeConnect(s: Socket): Unit = s.setReuseAddress(on)
     }
 
+    /**
+     * [[pekko.io.Inet.SocketOption]] to enable or disable SO_REUSEPORT
+     *
+     * For more information see [[java.net.StandardSocketOptions#SO_REUSEPORT]]
+     *
+     * @since 2.0.0
+     */
+    final case class ReusePort(on: Boolean) extends SocketOption {
+      override def beforeServerSocketBind(s: ServerSocket): Unit = 
setReusePort(s.getChannel, on)
+      override def beforeDatagramBind(s: DatagramSocket): Unit = 
setReusePort(s.getChannel, on)
+      override def beforeConnect(s: Socket): Unit = setReusePort(s.getChannel, 
on)
+    }
+
     /**
      * [[pekko.io.Inet.SocketOption]] to set the SO_SNDBUF option.
      *
@@ -166,6 +187,15 @@ object Inet {
      */
     val ReuseAddress = SO.ReuseAddress
 
+    /**
+     * [[pekko.io.Inet.SocketOption]] to enable or disable SO_REUSEPORT
+     *
+     * For more information see [[java.net.StandardSocketOptions#SO_REUSEPORT]]
+     *
+     * @since 2.0.0
+     */
+    def ReusePort = SO.ReusePort
+
     /**
      * [[pekko.io.Inet.SocketOption]] to set the SO_SNDBUF option.
      *
@@ -200,6 +230,15 @@ object Inet {
      */
     def reuseAddress(on: Boolean) = ReuseAddress(on)
 
+    /**
+     * [[pekko.io.Inet.SocketOption]] to enable or disable SO_REUSEPORT
+     *
+     * For more information see [[java.net.StandardSocketOptions#SO_REUSEPORT]]
+     *
+     * @since 2.0.0
+     */
+    def reusePort(on: Boolean) = ReusePort(on)
+
     /**
      * [[pekko.io.Inet.SocketOption]] to set the SO_SNDBUF option.
      *
diff --git 
a/actor/src/main/scala/org/apache/pekko/io/TcpOutgoingConnection.scala 
b/actor/src/main/scala/org/apache/pekko/io/TcpOutgoingConnection.scala
index f257eceb85..4982f7061a 100644
--- a/actor/src/main/scala/org/apache/pekko/io/TcpOutgoingConnection.scala
+++ b/actor/src/main/scala/org/apache/pekko/io/TcpOutgoingConnection.scala
@@ -50,10 +50,12 @@ private[io] class TcpOutgoingConnection(
 
   signDeathPact(commander)
 
-  options.foreach(_.beforeConnect(channel.socket))
-  localAddress.foreach(channel.socket.bind)
-  channelRegistry.register(channel, 0)
-  timeout.foreach(context.setReceiveTimeout) // Initiate connection timeout if 
supplied
+  reportConnectFailure {
+    options.foreach(_.beforeConnect(channel.socket))
+    localAddress.foreach(channel.socket.bind)
+    channelRegistry.register(channel, 0)
+    timeout.foreach(context.setReceiveTimeout) // Initiate connection timeout 
if supplied
+  }
 
   private def stop(cause: Throwable): Unit =
     stopWith(CloseInformation(Set(commander), 
CommandFailed(connect).withCause(cause)), shouldAbort = true)
diff --git a/actor/src/main/scala/org/apache/pekko/io/UdpManager.scala 
b/actor/src/main/scala/org/apache/pekko/io/UdpManager.scala
index 13da9e6851..78172ff082 100644
--- a/actor/src/main/scala/org/apache/pekko/io/UdpManager.scala
+++ b/actor/src/main/scala/org/apache/pekko/io/UdpManager.scala
@@ -63,7 +63,7 @@ private[io] class UdpManager(udp: UdpExt)
 
     case s: SimpleSender =>
       val commander = sender() // cache because we create a function that will 
run asynchly
-      registry => Props(classOf[UdpSender], udp, registry, commander, 
s.options)
+      registry => Props(classOf[UdpSender], udp, registry, commander, s)
   }
 
 }
diff --git a/actor/src/main/scala/org/apache/pekko/io/UdpSender.scala 
b/actor/src/main/scala/org/apache/pekko/io/UdpSender.scala
index bdc3d36fa6..a0bc4f598a 100644
--- a/actor/src/main/scala/org/apache/pekko/io/UdpSender.scala
+++ b/actor/src/main/scala/org/apache/pekko/io/UdpSender.scala
@@ -13,14 +13,15 @@
 
 package org.apache.pekko.io
 
+import java.nio.channels.DatagramChannel
+
 import scala.annotation.nowarn
-import scala.collection.immutable
 import scala.util.control.NonFatal
 
 import org.apache.pekko
 import pekko.actor._
 import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
-import pekko.io.Inet.{ DatagramChannelCreator, SocketOption }
+import pekko.io.Inet.DatagramChannelCreator
 import pekko.io.Udp._
 
 /**
@@ -31,29 +32,46 @@ private[io] class UdpSender(
     val udp: UdpExt,
     channelRegistry: ChannelRegistry,
     commander: ActorRef,
-    options: immutable.Traversable[SocketOption])
+    simpleSender: SimpleSender)
     extends Actor
     with ActorLogging
     with WithUdpSend
     with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
 
+  private val options = simpleSender.options
+
   val channel = {
-    val datagramChannel = options
-      .collectFirst {
-        case creator: DatagramChannelCreator => creator
-      }
-      .getOrElse(DatagramChannelCreator())
-      .create()
-    datagramChannel.configureBlocking(false)
-    val socket = datagramChannel.socket
-    options.foreach { _.beforeDatagramBind(socket) }
+    var datagramChannel: DatagramChannel = null
+    try {
+      datagramChannel = options
+        .collectFirst {
+          case creator: DatagramChannelCreator => creator
+        }
+        .getOrElse(DatagramChannelCreator())
+        .create()
+      datagramChannel.configureBlocking(false)
+      val socket = datagramChannel.socket
+      options.foreach { _.beforeDatagramBind(socket) }
+      channelRegistry.register(datagramChannel, initialOps = 0)
 
-    datagramChannel
+      datagramChannel
+    } catch {
+      case NonFatal(e) =>
+        if ((datagramChannel ne null) && datagramChannel.isOpen) {
+          try datagramChannel.close()
+          catch {
+            case NonFatal(closeError) => log.debug("Error closing 
DatagramChannel: {}", closeError)
+          }
+        }
+        commander ! CommandFailed(simpleSender)
+        log.debug("Failed to create UDP simple sender: {}", e)
+        context.stop(self)
+        null
+    }
   }
-  channelRegistry.register(channel, initialOps = 0)
 
   def receive: Receive = {
-    case registration: ChannelRegistration =>
+    case registration: ChannelRegistration if channel ne null =>
       options.foreach {
         case v2: Inet.SocketOptionV2 => v2.afterConnect(channel.socket)
         case _                       =>
@@ -62,7 +80,7 @@ private[io] class UdpSender(
       context.become(sendHandlers(registration))
   }
 
-  override def postStop(): Unit = if (channel.isOpen) {
+  override def postStop(): Unit = if ((channel ne null) && channel.isOpen) {
     log.debug("Closing DatagramChannel after being stopped")
     try channel.close()
     catch {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to