This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 541fbb2  [SPARK-55099] Support `NetworkPolicy` for `SparkCluster`
541fbb2 is described below

commit 541fbb226ee43a5bf210849ec9a6a3cff3051351
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue Jan 20 20:39:50 2026 +0900

    [SPARK-55099] Support `NetworkPolicy` for `SparkCluster`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `NetworkPolicy` for `SparkCluster`.
    
    ### Why are the changes needed?
    
    To enhance the security of `SparkCluster` worker pods.
    
    `NetworkPolicy` is frequently used in the production to isolate Spark 
Clusters.
    
    - https://kubernetes.io/docs/concepts/services-networking/network-policies/
    
    ### Does this PR introduce _any_ user-facing change?
    
    The executor pods of `SparkCluster` will allow ingress from the same 
`SparkCluster` master or workers. (Or, an external pod with the label, 
`spark.operator/spark-cluster-name`, whose value is equal to the cluster name)
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #468 from dongjoon-hyun/SPARK-55099.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../k8s/operator/context/SparkClusterContext.java  | 10 +++++++
 .../SparkClusterResourceSpecFactory.java           |  1 +
 .../reconciler/reconcilesteps/ClusterInitStep.java |  3 ++
 .../k8s/operator/SparkClusterResourceSpec.java     | 34 ++++++++++++++++++++++
 .../k8s/operator/SparkClusterResourceSpecTest.java | 17 +++++++++++
 5 files changed, 65 insertions(+)

diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/context/SparkClusterContext.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/context/SparkClusterContext.java
index 892fed5..88eadae 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/context/SparkClusterContext.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/context/SparkClusterContext.java
@@ -24,6 +24,7 @@ import java.util.Optional;
 import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.api.model.apps.StatefulSet;
 import io.fabric8.kubernetes.api.model.autoscaling.v2.HorizontalPodAutoscaler;
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
 import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -103,6 +104,15 @@ public class SparkClusterContext extends 
BaseContext<SparkCluster> {
     return getSecondaryResourceSpec().getWorkerStatefulSet();
   }
 
+  /**
+   * Returns the specification for the worker NetworkPolicy.
+   *
+   * @return The NetworkPolicy object for the workers.
+   */
+  public NetworkPolicy getWorkerNetworkPolicySpec() {
+    return getSecondaryResourceSpec().getWorkerNetworkPolicy();
+  }
+
   /**
    * Returns the specification for the HorizontalPodAutoscaler, if present.
    *
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterResourceSpecFactory.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterResourceSpecFactory.java
index 9a394fd..161234c 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterResourceSpecFactory.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterResourceSpecFactory.java
@@ -51,6 +51,7 @@ public final class SparkClusterResourceSpecFactory {
     decorator.decorate(spec.getWorkerService());
     decorator.decorate(spec.getMasterStatefulSet());
     decorator.decorate(spec.getWorkerStatefulSet());
+    decorator.decorate(spec.getWorkerNetworkPolicy());
     if (spec.getHorizontalPodAutoscaler().isPresent()) {
       decorator.decorate(spec.getHorizontalPodAutoscaler().get());
     }
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/ClusterInitStep.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/ClusterInitStep.java
index 443760c..b3b38c4 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/ClusterInitStep.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/ClusterInitStep.java
@@ -31,6 +31,7 @@ import java.time.Instant;
 
 import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.api.model.apps.StatefulSet;
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.spark.k8s.operator.SparkCluster;
@@ -75,6 +76,8 @@ public class ClusterInitStep extends ClusterReconcileStep {
       
context.getClient().apps().statefulSets().resource(masterStatefulSet).create();
       StatefulSet workerStatefulSet = context.getWorkerStatefulSetSpec();
       
context.getClient().apps().statefulSets().resource(workerStatefulSet).create();
+      NetworkPolicy workerNetworkPolicy = context.getWorkerNetworkPolicySpec();
+      
context.getClient().network().networkPolicies().resource(workerNetworkPolicy).create();
       var horizontalPodAutoscaler = context.getHorizontalPodAutoscalerSpec();
       if (horizontalPodAutoscaler.isPresent()) {
         context
diff --git 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
index 8962aaf..57bbb73 100644
--- 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
+++ 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
@@ -38,6 +38,8 @@ import 
io.fabric8.kubernetes.api.model.autoscaling.v2.HorizontalPodAutoscalerBui
 import 
io.fabric8.kubernetes.api.model.autoscaling.v2.HorizontalPodAutoscalerSpec;
 import 
io.fabric8.kubernetes.api.model.autoscaling.v2.HorizontalPodAutoscalerSpecBuilder;
 import io.fabric8.kubernetes.api.model.autoscaling.v2.MetricSpecBuilder;
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyBuilder;
 import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget;
 import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudgetBuilder;
 import lombok.Getter;
@@ -54,6 +56,7 @@ public class SparkClusterResourceSpec {
   @Getter private final Service workerService;
   @Getter private final StatefulSet masterStatefulSet;
   @Getter private final StatefulSet workerStatefulSet;
+  @Getter private final NetworkPolicy workerNetworkPolicy;
   @Getter private final Optional<HorizontalPodAutoscaler> 
horizontalPodAutoscaler;
   @Getter private final Optional<PodDisruptionBudget> podDisruptionBudget;
 
@@ -114,6 +117,7 @@ public class SparkClusterResourceSpec {
             workerSpec.getStatefulSetSpec());
     horizontalPodAutoscaler = buildHorizontalPodAutoscaler(clusterName, 
namespace, spec);
     podDisruptionBudget = buildPodDisruptionBudget(clusterName, namespace, 
spec);
+    workerNetworkPolicy = buildWorkerNetworkPolicy(clusterName, namespace);
   }
 
   /**
@@ -456,4 +460,34 @@ public class SparkClusterResourceSpec {
             .endSpec()
             .build());
   }
+
+  /**
+   * Builds the NetworkPolicy for the SparkCluster.
+   *
+   * @param clusterName The name of the SparkCluster.
+   * @param namespace The namespace of the SparkApplication.
+   * @return A NetworkPolicy object.
+   */
+  private NetworkPolicy buildWorkerNetworkPolicy(String clusterName, String 
namespace) {
+    return new NetworkPolicyBuilder()
+        .withNewMetadata()
+        .withName(clusterName + "-worker")
+        .withNamespace(namespace)
+        .addToLabels(LABEL_SPARK_CLUSTER_NAME, clusterName)
+        .endMetadata()
+        .withNewSpec()
+        .withNewPodSelector()
+        .addToMatchLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
+        .addToMatchLabels(LABEL_SPARK_CLUSTER_NAME, clusterName)
+        .endPodSelector()
+        .addNewIngress()
+        .addNewFrom()
+        .withNewPodSelector()
+        .addToMatchLabels(LABEL_SPARK_CLUSTER_NAME, clusterName)
+        .endPodSelector()
+        .endFrom()
+        .endIngress()
+        .endSpec()
+        .build();
+  }
 }
diff --git 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
index 675365c..63c4171 100644
--- 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
+++ 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
@@ -36,6 +36,7 @@ import io.fabric8.kubernetes.api.model.ServiceSpecBuilder;
 import io.fabric8.kubernetes.api.model.apps.StatefulSet;
 import io.fabric8.kubernetes.api.model.apps.StatefulSetSpec;
 import io.fabric8.kubernetes.api.model.apps.StatefulSetSpecBuilder;
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -316,6 +317,22 @@ class SparkClusterResourceSpecTest {
             .get(LABEL_SPARK_VERSION_NAME));
   }
 
+  @Test
+  void testWorkerNetworkPolicy() {
+    SparkClusterResourceSpec spec = new SparkClusterResourceSpec(cluster, new 
SparkConf());
+    NetworkPolicy policy = spec.getWorkerNetworkPolicy();
+    assertEquals("my-namespace", policy.getMetadata().getNamespace());
+    assertEquals("cluster-name-worker", policy.getMetadata().getName());
+    var expected = Map.of(
+        LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE,
+        LABEL_SPARK_CLUSTER_NAME, "cluster-name");
+    assertEquals(expected, policy.getSpec().getPodSelector().getMatchLabels());
+    assertTrue(policy.getSpec().getEgress().isEmpty());
+    assertEquals(1, policy.getSpec().getIngress().size());
+    assertEquals(Map.of(LABEL_SPARK_CLUSTER_NAME, "cluster-name"),
+        
policy.getSpec().getIngress().get(0).getFrom().get(0).getPodSelector().getMatchLabels());
+  }
+
   @Test
   void testEmptyHorizontalPodAutoscalerByDefault() {
     SparkClusterResourceSpec spec = new SparkClusterResourceSpec(cluster, new 
SparkConf());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to