This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1e1b3635cbef [SPARK-55653][K8S] Support `NetworkPolicy` for Spark
executor pods
1e1b3635cbef is described below
commit 1e1b3635cbefa426e39ccf91581b4e7a1cc4712d
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue Feb 24 07:02:27 2026 -0800
[SPARK-55653][K8S] Support `NetworkPolicy` for Spark executor pods
### What changes were proposed in this pull request?
This PR aims to support `NetworkPolicy` for Spark executor pods.
### Why are the changes needed?
`NetworkPolicy` is frequently used in the production to isolate Spark
applications.
- https://kubernetes.io/docs/concepts/services-networking/network-policies/
### Does this PR introduce _any_ user-facing change?
This is a security feature to make Spark K8s executor pods access only from
the pods with the same application ID.
There are two ways if a user wants to access the executor pods from outside.
1. Use a pod with the same application ID with the target Spark
applications.
2. Submit a Spark job with the following configuration.
```
spark.kubernetes.driver.pod.excludedFeatureSteps=org.apache.spark.deploy.k8s.features.NetworkPolicyFeatureStep
```
### How was this patch tested?
Pass the CIs with the newly added test suite.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: `Gemini 3.1 Pro (High)` on `Antigravity`
Closes #54442 from dongjoon-hyun/SPARK-55653.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
docs/core-migration-guide.md | 2 +
.../k8s/features/NetworkPolicyFeatureStep.scala | 60 ++++++++++++++++++++++
.../k8s/submit/KubernetesDriverBuilder.scala | 1 +
.../features/NetworkPolicyFeatureStepSuite.scala | 60 ++++++++++++++++++++++
4 files changed, 123 insertions(+)
diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md
index 63945c5aadc0..b44a2a4ddd0e 100644
--- a/docs/core-migration-guide.md
+++ b/docs/core-migration-guide.md
@@ -26,6 +26,8 @@ license: |
- Since Spark 4.2, Spark will allocate executor pods with a batch size of
`20`. To restore the legacy behavior, you can set
`spark.kubernetes.allocation.batch.size` to `10`.
+- Since Spark 4.2, Spark configures a `NetworkPolicy` by default so that
executor pods only accept ingress traffic from the driver and peer executors
within the same job. To disable this and restore the legacy behavior, set
`spark.kubernetes.driver.pod.excludedFeatureSteps` to
`org.apache.spark.deploy.k8s.features.NetworkPolicyFeatureStep`.
+
## Upgrading from Core 4.0 to 4.1
- Since Spark 4.1, Spark Master daemon provides REST API by default. To
restore the behavior before Spark 4.1, you can set `spark.master.rest.enabled`
to `false`.
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/NetworkPolicyFeatureStep.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/NetworkPolicyFeatureStep.scala
new file mode 100644
index 000000000000..e6d53ca9438f
--- /dev/null
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/NetworkPolicyFeatureStep.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.HasMetadata
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyBuilder
+
+import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod}
+import org.apache.spark.deploy.k8s.Constants._
+
+/**
+ * A feature step that configures a NetworkPolicy for Spark executors.
+ * It restricts ingress traffic to executors so that they only accept
connections
+ * from the driver and other executors within the same Spark application.
+ */
+private[spark] class NetworkPolicyFeatureStep(conf: KubernetesDriverConf)
+ extends KubernetesFeatureConfigStep {
+ private val policyName = conf.resourceNamePrefix + "-policy"
+
+ override def configurePod(pod: SparkPod): SparkPod = pod
+
+ override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+ val appId = conf.appId
+ val policy = new NetworkPolicyBuilder()
+ .withNewMetadata()
+ .withName(policyName)
+ .withNamespace(conf.namespace)
+ .addToLabels(SPARK_APP_ID_LABEL, appId)
+ .endMetadata()
+ .withNewSpec()
+ .withNewPodSelector()
+ .addToMatchLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+ .addToMatchLabels(SPARK_APP_ID_LABEL, appId)
+ .endPodSelector()
+ .addNewIngress()
+ .addNewFrom()
+ .withNewPodSelector()
+ .addToMatchLabels(SPARK_APP_ID_LABEL, appId)
+ .endPodSelector()
+ .endFrom()
+ .endIngress()
+ .endSpec()
+ .build();
+ Seq(policy)
+ }
+}
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index da234762ea1d..f50051de737d 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -76,6 +76,7 @@ class KubernetesDriverBuilder {
new BasicDriverFeatureStep(conf),
new DriverKubernetesCredentialsFeatureStep(conf),
new DriverServiceFeatureStep(conf),
+ new NetworkPolicyFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/NetworkPolicyFeatureStepSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/NetworkPolicyFeatureStepSuite.scala
new file mode 100644
index 000000000000..f75969ded30b
--- /dev/null
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/NetworkPolicyFeatureStepSuite.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod}
+import org.apache.spark.deploy.k8s.Constants._
+
+class NetworkPolicyFeatureStepSuite extends SparkFunSuite {
+
+ test("NetworkPolicy creation") {
+ val conf = KubernetesTestConf.createDriverConf(sparkConf = new
SparkConf(false))
+ val step = new NetworkPolicyFeatureStep(conf)
+
+ // configures pod identically
+ val pod = SparkPod.initialPod()
+ assert(step.configurePod(pod) === pod)
+
+ // additional pod system properties is empty
+ assert(step.getAdditionalPodSystemProperties().isEmpty)
+
+ // Check additional resources
+ val resources = step.getAdditionalKubernetesResources()
+ assert(resources.size === 1)
+
+ val policy = resources.head.asInstanceOf[NetworkPolicy]
+ assert(policy.getMetadata.getName === s"${conf.resourceNamePrefix}-policy")
+ assert(policy.getMetadata.getNamespace === conf.namespace)
+ assert(policy.getMetadata.getLabels.get(SPARK_APP_ID_LABEL) === conf.appId)
+
+ val labels = policy.getSpec.getPodSelector.getMatchLabels
+ assert(labels.get(SPARK_ROLE_LABEL) === SPARK_POD_EXECUTOR_ROLE)
+ assert(labels.get(SPARK_APP_ID_LABEL) === conf.appId)
+
+ val ingress = policy.getSpec.getIngress.asScala
+ assert(ingress.size === 1)
+
+ val from = ingress.head.getFrom.asScala
+ assert(from.size === 1)
+ assert(from.head.getPodSelector.getMatchLabels.get(SPARK_APP_ID_LABEL) ===
conf.appId)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]