This is an automated email from the ASF dual-hosted git repository.
He-Pin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 8116ff7c5e feat: support SO_REUSEPORT socket option (#2915)
8116ff7c5e is described below
commit 8116ff7c5e9fa0039c0cd67e5e6870e8ac2a632d
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Fri May 8 21:03:18 2026 +0800
feat: support SO_REUSEPORT socket option (#2915)
* feat: support SO_REUSEPORT socket option
* Fix license header in InetSocketOptionSpec.scala
The new test file incorrectly used the derived-from-Akka header with
Lightbend copyright. Since this is a brand new file created for the
SO_REUSEPORT feature, it should use the standard Apache-only license
header like other new files in the project.
🤖 Generated with [Qoder][https://qoder.com]
* Address SO_REUSEPORT review feedback
---
.../org/apache/pekko/io/InetSocketOptionSpec.scala | 82 ++++++++++++++++++++++
.../org/apache/pekko/io/TcpIntegrationSpec.scala | 18 ++++-
.../org/apache/pekko/io/UdpIntegrationSpec.scala | 14 ++++
.../src/main/scala/org/apache/pekko/io/Inet.scala | 39 ++++++++++
.../apache/pekko/io/TcpOutgoingConnection.scala | 10 +--
.../scala/org/apache/pekko/io/UdpManager.scala | 2 +-
.../main/scala/org/apache/pekko/io/UdpSender.scala | 50 ++++++++-----
7 files changed, 193 insertions(+), 22 deletions(-)
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/io/InetSocketOptionSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/io/InetSocketOptionSpec.scala
new file mode 100644
index 0000000000..1825cfd1eb
--- /dev/null
+++ b/actor-tests/src/test/scala/org/apache/pekko/io/InetSocketOptionSpec.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.io
+
+import java.net.StandardSocketOptions
+import java.nio.channels.DatagramChannel
+import java.nio.channels.NetworkChannel
+import java.nio.channels.ServerSocketChannel
+import java.nio.channels.SocketChannel
+
+import org.apache.pekko
+import pekko.testkit.PekkoSpec
+
+class InetSocketOptionSpec extends PekkoSpec {
+
+ "Inet.SO.ReusePort" must {
+
+ "set SO_REUSEPORT on TCP server sockets before bind" in {
+ val channel = ServerSocketChannel.open()
+ try {
+ pendingIfReusePortUnsupported(channel)
+
+ Inet.SO.ReusePort(true).beforeServerSocketBind(channel.socket)
+
+ channel.getOption(StandardSocketOptions.SO_REUSEPORT).booleanValue
should ===(true)
+ } finally {
+ channel.close()
+ }
+ }
+
+ "set SO_REUSEPORT on TCP client sockets before connect" in {
+ val channel = SocketChannel.open()
+ try {
+ pendingIfReusePortUnsupported(channel)
+
+ Tcp.SO.ReusePort(true).beforeConnect(channel.socket)
+
+ channel.getOption(StandardSocketOptions.SO_REUSEPORT).booleanValue
should ===(true)
+ } finally {
+ channel.close()
+ }
+ }
+
+ "set SO_REUSEPORT on UDP sockets before bind" in {
+ val channel = DatagramChannel.open()
+ try {
+ pendingIfReusePortUnsupported(channel)
+
+ Udp.SO.ReusePort(true).beforeDatagramBind(channel.socket)
+
+ channel.getOption(StandardSocketOptions.SO_REUSEPORT).booleanValue
should ===(true)
+ } finally {
+ channel.close()
+ }
+ }
+
+ "be available through the Java TCP and UDP socket option APIs" in {
+ TcpSO.reusePort(true) should ===(Inet.SO.ReusePort(true))
+ UdpSO.reusePort(false) should ===(Inet.SO.ReusePort(false))
+ }
+ }
+
+ private def pendingIfReusePortUnsupported(channel: NetworkChannel): Unit =
+ if
(!channel.supportedOptions().contains(StandardSocketOptions.SO_REUSEPORT)) {
+ pending
+ }
+}
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/io/TcpIntegrationSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/io/TcpIntegrationSpec.scala
index f7e3b857fe..e4de5a636e 100644
--- a/actor-tests/src/test/scala/org/apache/pekko/io/TcpIntegrationSpec.scala
+++ b/actor-tests/src/test/scala/org/apache/pekko/io/TcpIntegrationSpec.scala
@@ -14,7 +14,7 @@
package org.apache.pekko.io
import java.io.IOException
-import java.net.{ InetSocketAddress, ServerSocket }
+import java.net.{ InetSocketAddress, ServerSocket, Socket }
import scala.concurrent.duration._
@@ -184,6 +184,22 @@ class TcpIntegrationSpec extends PekkoSpec("""
replies should ===(Nil)
}
+ "reply with CommandFailed when a connect socket option fails before
connect" in {
+ val connectCommander = TestProbe()
+ val failure = new UnsupportedOperationException("boom")
+ val failingOption = new Inet.SocketOption {
+ override def beforeConnect(s: Socket): Unit = throw failure
+ }
+ val endpoint = new InetSocketAddress("127.0.0.1", 1)
+ val command = Connect(endpoint, options = List(failingOption))
+
+ connectCommander.send(IO(Tcp), command)
+
+ val commandFailed = connectCommander.expectMsgType[CommandFailed]
+ commandFailed.cmd should ===(command)
+ commandFailed.cause should ===(Some(failure))
+ }
+
"handle tcp connection actor death properly" in new
TestSetup(shouldBindServer = false) {
val serverSocket = new ServerSocket(endpoint.getPort(), 100,
endpoint.getAddress())
val connectCommander = TestProbe()
diff --git
a/actor-tests/src/test/scala/org/apache/pekko/io/UdpIntegrationSpec.scala
b/actor-tests/src/test/scala/org/apache/pekko/io/UdpIntegrationSpec.scala
index 957b7828e8..9a20d7bd9a 100644
--- a/actor-tests/src/test/scala/org/apache/pekko/io/UdpIntegrationSpec.scala
+++ b/actor-tests/src/test/scala/org/apache/pekko/io/UdpIntegrationSpec.scala
@@ -129,6 +129,15 @@ class UdpIntegrationSpec extends PekkoSpec("""
commander.expectMsgType[Bound]
assert(assertOption.openCalled === 1)
}
+
+ "reply with CommandFailed when a simple sender socket option fails before
registration" in {
+ val commander = TestProbe()
+ val command = SimpleSender(options = List(FailBeforeDatagramBind()))
+
+ commander.send(IO(Udp), command)
+
+ commander.expectMsg(CommandFailed(command))
+ }
}
}
@@ -162,3 +171,8 @@ private case class AssertOpenDatagramChannel() extends
DatagramChannelCreator {
super.create()
}
}
+
+private case class FailBeforeDatagramBind() extends SocketOption {
+ override def beforeDatagramBind(ds: DatagramSocket): Unit =
+ throw new UnsupportedOperationException("boom")
+}
diff --git a/actor/src/main/scala/org/apache/pekko/io/Inet.scala
b/actor/src/main/scala/org/apache/pekko/io/Inet.scala
index ccf28a98bd..26648029b1 100644
--- a/actor/src/main/scala/org/apache/pekko/io/Inet.scala
+++ b/actor/src/main/scala/org/apache/pekko/io/Inet.scala
@@ -16,12 +16,20 @@ package org.apache.pekko.io
import java.net.DatagramSocket
import java.net.ServerSocket
import java.net.Socket
+import java.net.StandardSocketOptions
import java.nio.channels.DatagramChannel
+import java.nio.channels.NetworkChannel
import scala.annotation.nowarn
object Inet {
+ private def setReusePort(channel: NetworkChannel, on: Boolean): Unit =
+ if (channel eq null)
+ throw new UnsupportedOperationException("SO_REUSEPORT can only be set on
channel-backed sockets")
+ else
+ channel.setOption(StandardSocketOptions.SO_REUSEPORT,
java.lang.Boolean.valueOf(on))
+
/**
* SocketOption is a package of data (from the user) and associated
* behavior (how to apply that to a channel).
@@ -126,6 +134,19 @@ object Inet {
override def beforeConnect(s: Socket): Unit = s.setReuseAddress(on)
}
+ /**
+ * [[pekko.io.Inet.SocketOption]] to enable or disable SO_REUSEPORT
+ *
+ * For more information see [[java.net.StandardSocketOptions#SO_REUSEPORT]]
+ *
+ * @since 2.0.0
+ */
+ final case class ReusePort(on: Boolean) extends SocketOption {
+ override def beforeServerSocketBind(s: ServerSocket): Unit =
setReusePort(s.getChannel, on)
+ override def beforeDatagramBind(s: DatagramSocket): Unit =
setReusePort(s.getChannel, on)
+ override def beforeConnect(s: Socket): Unit = setReusePort(s.getChannel,
on)
+ }
+
/**
* [[pekko.io.Inet.SocketOption]] to set the SO_SNDBUF option.
*
@@ -166,6 +187,15 @@ object Inet {
*/
val ReuseAddress = SO.ReuseAddress
+ /**
+ * [[pekko.io.Inet.SocketOption]] to enable or disable SO_REUSEPORT
+ *
+ * For more information see [[java.net.StandardSocketOptions#SO_REUSEPORT]]
+ *
+ * @since 2.0.0
+ */
+ def ReusePort = SO.ReusePort
+
/**
* [[pekko.io.Inet.SocketOption]] to set the SO_SNDBUF option.
*
@@ -200,6 +230,15 @@ object Inet {
*/
def reuseAddress(on: Boolean) = ReuseAddress(on)
+ /**
+ * [[pekko.io.Inet.SocketOption]] to enable or disable SO_REUSEPORT
+ *
+ * For more information see [[java.net.StandardSocketOptions#SO_REUSEPORT]]
+ *
+ * @since 2.0.0
+ */
+ def reusePort(on: Boolean) = ReusePort(on)
+
/**
* [[pekko.io.Inet.SocketOption]] to set the SO_SNDBUF option.
*
diff --git
a/actor/src/main/scala/org/apache/pekko/io/TcpOutgoingConnection.scala
b/actor/src/main/scala/org/apache/pekko/io/TcpOutgoingConnection.scala
index f257eceb85..4982f7061a 100644
--- a/actor/src/main/scala/org/apache/pekko/io/TcpOutgoingConnection.scala
+++ b/actor/src/main/scala/org/apache/pekko/io/TcpOutgoingConnection.scala
@@ -50,10 +50,12 @@ private[io] class TcpOutgoingConnection(
signDeathPact(commander)
- options.foreach(_.beforeConnect(channel.socket))
- localAddress.foreach(channel.socket.bind)
- channelRegistry.register(channel, 0)
- timeout.foreach(context.setReceiveTimeout) // Initiate connection timeout if
supplied
+ reportConnectFailure {
+ options.foreach(_.beforeConnect(channel.socket))
+ localAddress.foreach(channel.socket.bind)
+ channelRegistry.register(channel, 0)
+ timeout.foreach(context.setReceiveTimeout) // Initiate connection timeout
if supplied
+ }
private def stop(cause: Throwable): Unit =
stopWith(CloseInformation(Set(commander),
CommandFailed(connect).withCause(cause)), shouldAbort = true)
diff --git a/actor/src/main/scala/org/apache/pekko/io/UdpManager.scala
b/actor/src/main/scala/org/apache/pekko/io/UdpManager.scala
index 13da9e6851..78172ff082 100644
--- a/actor/src/main/scala/org/apache/pekko/io/UdpManager.scala
+++ b/actor/src/main/scala/org/apache/pekko/io/UdpManager.scala
@@ -63,7 +63,7 @@ private[io] class UdpManager(udp: UdpExt)
case s: SimpleSender =>
val commander = sender() // cache because we create a function that will
run asynchly
- registry => Props(classOf[UdpSender], udp, registry, commander,
s.options)
+ registry => Props(classOf[UdpSender], udp, registry, commander, s)
}
}
diff --git a/actor/src/main/scala/org/apache/pekko/io/UdpSender.scala
b/actor/src/main/scala/org/apache/pekko/io/UdpSender.scala
index bdc3d36fa6..a0bc4f598a 100644
--- a/actor/src/main/scala/org/apache/pekko/io/UdpSender.scala
+++ b/actor/src/main/scala/org/apache/pekko/io/UdpSender.scala
@@ -13,14 +13,15 @@
package org.apache.pekko.io
+import java.nio.channels.DatagramChannel
+
import scala.annotation.nowarn
-import scala.collection.immutable
import scala.util.control.NonFatal
import org.apache.pekko
import pekko.actor._
import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
-import pekko.io.Inet.{ DatagramChannelCreator, SocketOption }
+import pekko.io.Inet.DatagramChannelCreator
import pekko.io.Udp._
/**
@@ -31,29 +32,46 @@ private[io] class UdpSender(
val udp: UdpExt,
channelRegistry: ChannelRegistry,
commander: ActorRef,
- options: immutable.Traversable[SocketOption])
+ simpleSender: SimpleSender)
extends Actor
with ActorLogging
with WithUdpSend
with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
+ private val options = simpleSender.options
+
val channel = {
- val datagramChannel = options
- .collectFirst {
- case creator: DatagramChannelCreator => creator
- }
- .getOrElse(DatagramChannelCreator())
- .create()
- datagramChannel.configureBlocking(false)
- val socket = datagramChannel.socket
- options.foreach { _.beforeDatagramBind(socket) }
+ var datagramChannel: DatagramChannel = null
+ try {
+ datagramChannel = options
+ .collectFirst {
+ case creator: DatagramChannelCreator => creator
+ }
+ .getOrElse(DatagramChannelCreator())
+ .create()
+ datagramChannel.configureBlocking(false)
+ val socket = datagramChannel.socket
+ options.foreach { _.beforeDatagramBind(socket) }
+ channelRegistry.register(datagramChannel, initialOps = 0)
- datagramChannel
+ datagramChannel
+ } catch {
+ case NonFatal(e) =>
+ if ((datagramChannel ne null) && datagramChannel.isOpen) {
+ try datagramChannel.close()
+ catch {
+ case NonFatal(closeError) => log.debug("Error closing
DatagramChannel: {}", closeError)
+ }
+ }
+ commander ! CommandFailed(simpleSender)
+ log.debug("Failed to create UDP simple sender: {}", e)
+ context.stop(self)
+ null
+ }
}
- channelRegistry.register(channel, initialOps = 0)
def receive: Receive = {
- case registration: ChannelRegistration =>
+ case registration: ChannelRegistration if channel ne null =>
options.foreach {
case v2: Inet.SocketOptionV2 => v2.afterConnect(channel.socket)
case _ =>
@@ -62,7 +80,7 @@ private[io] class UdpSender(
context.become(sendHandlers(registration))
}
- override def postStop(): Unit = if (channel.isOpen) {
+ override def postStop(): Unit = if ((channel ne null) && channel.isOpen) {
log.debug("Closing DatagramChannel after being stopped")
try channel.close()
catch {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]