This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 5f2afca  [SPARK-55085] Support `NetworkPolicy` for `SparkApplication`
5f2afca is described below

commit 5f2afca5454b4445865e81424313169dd0490e82
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat Jan 17 22:51:59 2026 +0900

    [SPARK-55085] Support `NetworkPolicy` for `SparkApplication`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `NetworkPolicy` for `SparkApplication`.
    
    ### Why are the changes needed?
    
    To enhance the security of `SparkApplication` executor pods.
    
    `NetworkPolicy` is frequently used in the production to isolate Spark 
Applications.
    
    - https://kubernetes.io/docs/concepts/services-networking/network-policies/
    
    ### Does this PR introduce _any_ user-facing change?
    
    The executor pods of `SparkApplication` will allow ingress from the same 
`SparkApplication` driver or executors.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #465 from dongjoon-hyun/SPARK-55085.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../templates/operator-rbac.yaml                   |  1 +
 .../spark/k8s/operator/SparkAppResourceSpec.java   | 33 +++++++++++++
 .../k8s/operator/SparkAppResourceSpecTest.java     | 55 +++++++++++++++++++++-
 3 files changed, 88 insertions(+), 1 deletion(-)

diff --git 
a/build-tools/helm/spark-kubernetes-operator/templates/operator-rbac.yaml 
b/build-tools/helm/spark-kubernetes-operator/templates/operator-rbac.yaml
index 55840c4..59d155b 100644
--- a/build-tools/helm/spark-kubernetes-operator/templates/operator-rbac.yaml
+++ b/build-tools/helm/spark-kubernetes-operator/templates/operator-rbac.yaml
@@ -50,6 +50,7 @@ rules:
       - "networking.k8s.io"
     resources:
       - ingresses
+      - networkpolicies
     verbs:
       - '*'
   - apiGroups:
diff --git 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java
 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java
index 51b305f..14644e4 100644
--- 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java
+++ 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java
@@ -29,6 +29,8 @@ import io.fabric8.kubernetes.api.model.ContainerBuilder;
 import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyBuilder;
 import lombok.Getter;
 
 import org.apache.spark.deploy.k8s.Config;
@@ -96,6 +98,7 @@ public class SparkAppResourceSpec {
         KubernetesClientUtils.buildConfigMapJava(
             kubernetesDriverConf.configMapNameDriver(), confFilesMap, 
Map.of()));
     
this.driverPreResources.addAll(ConfigMapSpecUtils.buildConfigMaps(configMapSpecs));
+    
this.driverPreResources.add(buildNetworkPolicy(kubernetesDriverConf.appId(), 
namespace));
     this.driverResources.addAll(configureDriverServerIngress(sparkPod, 
driverServiceIngressList));
     this.driverPreResources.forEach(r -> setNamespaceIfMissing(r, namespace));
     this.driverResources.forEach(r -> setNamespaceIfMissing(r, namespace));
@@ -165,4 +168,34 @@ public class SparkAppResourceSpec {
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
   }
+
+  /**
+   * Builds the NetworkPolicy for the SparkApplication.
+   *
+   * @param appId     The application ID of the SparkApplication.
+   * @param namespace The namespace of the SparkApplication.
+   * @return A NetworkPolicy object.
+   */
+  private NetworkPolicy buildNetworkPolicy(String appId, String namespace) {
+    return new NetworkPolicyBuilder()
+        .withNewMetadata()
+        .withName(appId + "-network-policy")
+        .withNamespace(namespace)
+        
.addToLabels(org.apache.spark.k8s.operator.Constants.LABEL_SPARK_APPLICATION_NAME,
 appId)
+        .endMetadata()
+        .withNewSpec()
+        .withNewPodSelector()
+        .addToMatchLabels(Constants.SPARK_ROLE_LABEL(), 
Constants.SPARK_POD_EXECUTOR_ROLE())
+        .addToMatchLabels(Constants.SPARK_APP_ID_LABEL(), appId)
+        .endPodSelector()
+        .addNewIngress()
+        .addNewFrom()
+        .withNewPodSelector()
+        .addToMatchLabels(Constants.SPARK_APP_ID_LABEL(), appId)
+        .endPodSelector()
+        .endFrom()
+        .endIngress()
+        .endSpec()
+        .build();
+  }
 }
diff --git 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java
 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java
index 0dfc019..2eee28d 100644
--- 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java
+++ 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java
@@ -36,6 +36,8 @@ import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodBuilder;
 import io.fabric8.kubernetes.api.model.Volume;
 import io.fabric8.kubernetes.api.model.VolumeMount;
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyBuilder;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -72,7 +74,7 @@ class SparkAppResourceSpecTest {
         new SparkAppResourceSpec(mockConf, mockSpec, List.of(), List.of());
 
     Assertions.assertEquals(2, appResourceSpec.getDriverResources().size());
-    Assertions.assertEquals(1, appResourceSpec.getDriverPreResources().size());
+    Assertions.assertEquals(2, appResourceSpec.getDriverPreResources().size());
     Assertions.assertEquals(Pod.class, 
appResourceSpec.getDriverResources().get(0).getClass());
     Assertions.assertEquals(
         ConfigMap.class, 
appResourceSpec.getDriverResources().get(1).getClass());
@@ -110,6 +112,57 @@ class SparkAppResourceSpecTest {
     Assertions.assertEquals(proposedConfigVolume.getName(), 
proposedConfigVolumeMount.getName());
   }
 
+  @Test
+  void testNetworkPolicy() {
+    SparkAppDriverConf mockConf = mock(SparkAppDriverConf.class);
+    when(mockConf.appId()).thenReturn("app1");
+    when(mockConf.configMapNameDriver()).thenReturn("foo-network-policy");
+    when(mockConf.sparkConf())
+        .thenReturn(new SparkConf().set("spark.kubernetes.namespace", 
"foo-namespace"));
+
+    KubernetesDriverSpec mockSpec = mock(KubernetesDriverSpec.class);
+    Pod driver = buildBasicPod("driver");
+    SparkPod sparkPod = new SparkPod(driver, buildBasicContainer());
+
+    // Add some mock resources and pre-resources
+    Seq<HasMetadata> empty = 
CollectionConverters.asScala(List.<HasMetadata>of()).toList();
+    when(mockSpec.driverKubernetesResources()).thenReturn(empty);
+    when(mockSpec.driverPreKubernetesResources()).thenReturn(empty);
+    when(mockSpec.pod()).thenReturn(sparkPod);
+    when(mockSpec.systemProperties()).thenReturn(new HashMap<>());
+
+    SparkAppResourceSpec appResourceSpec =
+        new SparkAppResourceSpec(mockConf, mockSpec, List.of(), List.of());
+
+    Assertions.assertEquals(1, appResourceSpec.getDriverPreResources().size());
+    Assertions.assertEquals(NetworkPolicy.class,
+        appResourceSpec.getDriverPreResources().get(0).getClass());
+
+    NetworkPolicy expected = new NetworkPolicyBuilder()
+        .withNewMetadata()
+        .withName("app1-network-policy")
+        .withNamespace("foo-namespace")
+        .addToLabels(Constants.LABEL_SPARK_APPLICATION_NAME, "app1")
+        .endMetadata()
+        .withNewSpec()
+        .withNewPodSelector()
+        .addToMatchLabels(Constants.LABEL_SPARK_ROLE_NAME,
+            Constants.LABEL_SPARK_ROLE_EXECUTOR_VALUE)
+        .addToMatchLabels("spark-app-selector", "app1")
+        .endPodSelector()
+        .addNewIngress()
+        .addNewFrom()
+        .withNewPodSelector()
+        .addToMatchLabels("spark-app-selector", "app1")
+        .endPodSelector()
+        .endFrom()
+        .endIngress()
+        .endSpec()
+        .build();
+    Assertions.assertEquals(1, appResourceSpec.getDriverPreResources().size());
+    Assertions.assertEquals(expected, 
appResourceSpec.getDriverPreResources().get(0));
+  }
+
   protected Container buildBasicContainer() {
     return new ContainerBuilder()
         .withName("foo-container")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to