This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e1b3635cbef [SPARK-55653][K8S] Support `NetworkPolicy` for Spark 
executor pods
1e1b3635cbef is described below

commit 1e1b3635cbefa426e39ccf91581b4e7a1cc4712d
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue Feb 24 07:02:27 2026 -0800

    [SPARK-55653][K8S] Support `NetworkPolicy` for Spark executor pods
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `NetworkPolicy` for Spark executor pods.
    
    ### Why are the changes needed?
    
    `NetworkPolicy` is frequently used in the production to isolate Spark 
applications.
    
    - https://kubernetes.io/docs/concepts/services-networking/network-policies/
    
    ### Does this PR introduce _any_ user-facing change?
    
    This is a security feature to make Spark K8s executor pods access only from 
the pods with the same application ID.
    
    There are two ways if a user wants to access the executor pods from outside.
    
    1. Use a pod with the same application ID with the target Spark 
applications.
    2. Submit a Spark job with the following configuration.
    ```
    
spark.kubernetes.driver.pod.excludedFeatureSteps=org.apache.spark.deploy.k8s.features.NetworkPolicyFeatureStep
    ```
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test suite.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: `Gemini 3.1 Pro (High)` on `Antigravity`
    
    Closes #54442 from dongjoon-hyun/SPARK-55653.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 docs/core-migration-guide.md                       |  2 +
 .../k8s/features/NetworkPolicyFeatureStep.scala    | 60 ++++++++++++++++++++++
 .../k8s/submit/KubernetesDriverBuilder.scala       |  1 +
 .../features/NetworkPolicyFeatureStepSuite.scala   | 60 ++++++++++++++++++++++
 4 files changed, 123 insertions(+)

diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md
index 63945c5aadc0..b44a2a4ddd0e 100644
--- a/docs/core-migration-guide.md
+++ b/docs/core-migration-guide.md
@@ -26,6 +26,8 @@ license: |
 
 - Since Spark 4.2, Spark will allocate executor pods with a batch size of 
`20`. To restore the legacy behavior, you can set 
`spark.kubernetes.allocation.batch.size` to `10`.
 
+- Since Spark 4.2, Spark configures a `NetworkPolicy` by default so that 
executor pods only accept ingress traffic from the driver and peer executors 
within the same job. To disable this and restore the legacy behavior, set 
`spark.kubernetes.driver.pod.excludedFeatureSteps` to 
`org.apache.spark.deploy.k8s.features.NetworkPolicyFeatureStep`.
+
 ## Upgrading from Core 4.0 to 4.1
 
 - Since Spark 4.1, Spark Master daemon provides REST API by default. To 
restore the behavior before Spark 4.1, you can set `spark.master.rest.enabled` 
to `false`.
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/NetworkPolicyFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/NetworkPolicyFeatureStep.scala
new file mode 100644
index 000000000000..e6d53ca9438f
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/NetworkPolicyFeatureStep.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.HasMetadata
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyBuilder
+
+import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod}
+import org.apache.spark.deploy.k8s.Constants._
+
+/**
+ * A feature step that configures a NetworkPolicy for Spark executors.
+ * It restricts ingress traffic to executors so that they only accept 
connections
+ * from the driver and other executors within the same Spark application.
+ */
+private[spark] class NetworkPolicyFeatureStep(conf: KubernetesDriverConf)
+  extends KubernetesFeatureConfigStep {
+  private val policyName = conf.resourceNamePrefix + "-policy"
+
+  override def configurePod(pod: SparkPod): SparkPod = pod
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+    val appId = conf.appId
+    val policy = new NetworkPolicyBuilder()
+      .withNewMetadata()
+        .withName(policyName)
+        .withNamespace(conf.namespace)
+        .addToLabels(SPARK_APP_ID_LABEL, appId)
+      .endMetadata()
+      .withNewSpec()
+        .withNewPodSelector()
+          .addToMatchLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+          .addToMatchLabels(SPARK_APP_ID_LABEL, appId)
+          .endPodSelector()
+        .addNewIngress()
+          .addNewFrom()
+            .withNewPodSelector()
+              .addToMatchLabels(SPARK_APP_ID_LABEL, appId)
+              .endPodSelector()
+            .endFrom()
+          .endIngress()
+        .endSpec()
+      .build();
+    Seq(policy)
+  }
+}
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index da234762ea1d..f50051de737d 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -76,6 +76,7 @@ class KubernetesDriverBuilder {
       new BasicDriverFeatureStep(conf),
       new DriverKubernetesCredentialsFeatureStep(conf),
       new DriverServiceFeatureStep(conf),
+      new NetworkPolicyFeatureStep(conf),
       new MountSecretsFeatureStep(conf),
       new EnvSecretsFeatureStep(conf),
       new MountVolumesFeatureStep(conf),
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/NetworkPolicyFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/NetworkPolicyFeatureStepSuite.scala
new file mode 100644
index 000000000000..f75969ded30b
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/NetworkPolicyFeatureStepSuite.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod}
+import org.apache.spark.deploy.k8s.Constants._
+
+class NetworkPolicyFeatureStepSuite extends SparkFunSuite {
+
+  test("NetworkPolicy creation") {
+    val conf = KubernetesTestConf.createDriverConf(sparkConf = new 
SparkConf(false))
+    val step = new NetworkPolicyFeatureStep(conf)
+
+    // configures pod identically
+    val pod = SparkPod.initialPod()
+    assert(step.configurePod(pod) === pod)
+
+    // additional pod system properties is empty
+    assert(step.getAdditionalPodSystemProperties().isEmpty)
+
+    // Check additional resources
+    val resources = step.getAdditionalKubernetesResources()
+    assert(resources.size === 1)
+
+    val policy = resources.head.asInstanceOf[NetworkPolicy]
+    assert(policy.getMetadata.getName === s"${conf.resourceNamePrefix}-policy")
+    assert(policy.getMetadata.getNamespace === conf.namespace)
+    assert(policy.getMetadata.getLabels.get(SPARK_APP_ID_LABEL) === conf.appId)
+
+    val labels = policy.getSpec.getPodSelector.getMatchLabels
+    assert(labels.get(SPARK_ROLE_LABEL) === SPARK_POD_EXECUTOR_ROLE)
+    assert(labels.get(SPARK_APP_ID_LABEL) === conf.appId)
+
+    val ingress = policy.getSpec.getIngress.asScala
+    assert(ingress.size === 1)
+
+    val from = ingress.head.getFrom.asScala
+    assert(from.size === 1)
+    assert(from.head.getPodSelector.getMatchLabels.get(SPARK_APP_ID_LABEL) === 
conf.appId)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to