This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c9ae4ab7ead2 [SPARK-53999][CORE] Native KQueue Transport support on 
BSD/MacOS
c9ae4ab7ead2 is described below

commit c9ae4ab7ead2482fe13955fb6d3ba777755a07b6
Author: Kent Yao <[email protected]>
AuthorDate: Thu Oct 23 10:36:52 2025 -0700

    [SPARK-53999][CORE] Native KQueue Transport support on BSD/MacOS
    
    ### What changes were proposed in this pull request?
    This PR adds Native KQUEUE via JNI support for transport, such as shuffle, 
file, and rpc procedures
    
    ### Why are the changes needed?
    
    Feature parity between Linux and MacOS/BSD platforms
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, a new option for io.mode
    
    ### How was this patch tested?
    new unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #52703 from yaooqinn/SPARK-53999.
    
    Authored-by: Kent Yao <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../java/org/apache/spark/network/util/IOMode.java | 15 ++++++++--
 .../org/apache/spark/network/util/NettyUtils.java  |  6 ++++
 .../scala/org/apache/spark/ShuffleNettySuite.scala | 32 ++++++++++++++++++++--
 3 files changed, 48 insertions(+), 5 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java 
b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java
index 6b208d95bbfb..6ab401b9a0d5 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java
@@ -19,9 +19,18 @@ package org.apache.spark.network.util;
 
 /**
  * Selector for which form of low-level IO we should use.
- * NIO is always available, while EPOLL is only available on Linux.
- * AUTO is used to select EPOLL if it's available, or NIO otherwise.
  */
 public enum IOMode {
-  NIO, EPOLL
+  /**
+   * Java NIO (Selector), cross-platform portable
+   */
+  NIO,
+  /**
+   * Native EPOLL via JNI, Linux only
+   */
+  EPOLL,
+  /**
+   * Native KQUEUE via JNI, MacOS/BSD only
+   */
+  KQUEUE
 }
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java
index 2dd1c8f2e4a7..da4b3109bbe1 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java
@@ -26,6 +26,9 @@ import io.netty.channel.ServerChannel;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.kqueue.KQueueEventLoopGroup;
+import io.netty.channel.kqueue.KQueueServerSocketChannel;
+import io.netty.channel.kqueue.KQueueSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
@@ -68,6 +71,7 @@ public class NettyUtils {
     return switch (mode) {
       case NIO -> new NioEventLoopGroup(numThreads, threadFactory);
       case EPOLL -> new EpollEventLoopGroup(numThreads, threadFactory);
+      case KQUEUE ->  new KQueueEventLoopGroup(numThreads, threadFactory);
     };
   }
 
@@ -76,6 +80,7 @@ public class NettyUtils {
     return switch (mode) {
       case NIO -> NioSocketChannel.class;
       case EPOLL -> EpollSocketChannel.class;
+      case KQUEUE -> KQueueSocketChannel.class;
     };
   }
 
@@ -84,6 +89,7 @@ public class NettyUtils {
     return switch (mode) {
       case NIO -> NioServerSocketChannel.class;
       case EPOLL -> EpollServerSocketChannel.class;
+      case KQUEUE -> KQueueServerSocketChannel.class;
     };
   }
 
diff --git a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala 
b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
index 378a36184513..18a8453d60be 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala
@@ -17,14 +17,42 @@
 
 package org.apache.spark
 
-import org.scalatest.BeforeAndAfterAll
+import org.scalactic.source.Position
+import org.scalatest.{BeforeAndAfterAll, Tag}
 
-class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {
+import org.apache.spark.network.util.IOMode
+import org.apache.spark.util.Utils
+
+abstract class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {
 
   // This test suite should run all tests in ShuffleSuite with Netty shuffle 
mode.
 
+  def ioMode: IOMode = IOMode.NIO
+  def shouldRunTests: Boolean = true
   override def beforeAll(): Unit = {
     super.beforeAll()
     conf.set("spark.shuffle.blockTransferService", "netty")
+    conf.set("spark.shuffle.io.mode", ioMode.toString)
+  }
+
+  override protected def test(testName: String, testTags: Tag*)(testBody: => 
Any)(
+    implicit pos: Position): Unit = {
+    if (!shouldRunTests) {
+      ignore(s"$testName [disabled on ${Utils.osName} with $ioMode]")(testBody)
+    } else {
+      super.test(testName, testTags: _*) {testBody}
+    }
   }
 }
+
+class ShuffleNettyNioSuite extends ShuffleNettySuite
+
+class ShuffleNettyEpollSuite extends ShuffleNettySuite {
+  override def shouldRunTests: Boolean = Utils.isLinux
+  override def ioMode: IOMode = IOMode.EPOLL
+}
+
+class ShuffleNettyKQueueSuite extends ShuffleNettySuite {
+  override def shouldRunTests: Boolean = Utils.isMac
+  override def ioMode: IOMode = IOMode.KQUEUE
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to