This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 541fbb2 [SPARK-55099] Support `NetworkPolicy` for `SparkCluster`
541fbb2 is described below
commit 541fbb226ee43a5bf210849ec9a6a3cff3051351
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue Jan 20 20:39:50 2026 +0900
[SPARK-55099] Support `NetworkPolicy` for `SparkCluster`
### What changes were proposed in this pull request?
This PR aims to support `NetworkPolicy` for `SparkCluster`.
### Why are the changes needed?
To enhance the security of `SparkCluster` worker pods.
`NetworkPolicy` is frequently used in the production to isolate Spark
Clusters.
- https://kubernetes.io/docs/concepts/services-networking/network-policies/
### Does this PR introduce _any_ user-facing change?
The executor pods of `SparkCluster` will allow ingress from the same
`SparkCluster` master or workers. (Or, an external pod with the label,
`spark.operator/spark-cluster-name`, whose value is equal to the cluster name)
### How was this patch tested?
Pass the CIs with the newly added test case.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #468 from dongjoon-hyun/SPARK-55099.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../k8s/operator/context/SparkClusterContext.java | 10 +++++++
.../SparkClusterResourceSpecFactory.java | 1 +
.../reconciler/reconcilesteps/ClusterInitStep.java | 3 ++
.../k8s/operator/SparkClusterResourceSpec.java | 34 ++++++++++++++++++++++
.../k8s/operator/SparkClusterResourceSpecTest.java | 17 +++++++++++
5 files changed, 65 insertions(+)
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/context/SparkClusterContext.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/context/SparkClusterContext.java
index 892fed5..88eadae 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/context/SparkClusterContext.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/context/SparkClusterContext.java
@@ -24,6 +24,7 @@ import java.util.Optional;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.StatefulSet;
import io.fabric8.kubernetes.api.model.autoscaling.v2.HorizontalPodAutoscaler;
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -103,6 +104,15 @@ public class SparkClusterContext extends
BaseContext<SparkCluster> {
return getSecondaryResourceSpec().getWorkerStatefulSet();
}
+ /**
+ * Returns the specification for the worker NetworkPolicy.
+ *
+ * @return The NetworkPolicy object for the workers.
+ */
+ public NetworkPolicy getWorkerNetworkPolicySpec() {
+ return getSecondaryResourceSpec().getWorkerNetworkPolicy();
+ }
+
/**
* Returns the specification for the HorizontalPodAutoscaler, if present.
*
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterResourceSpecFactory.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterResourceSpecFactory.java
index 9a394fd..161234c 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterResourceSpecFactory.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/SparkClusterResourceSpecFactory.java
@@ -51,6 +51,7 @@ public final class SparkClusterResourceSpecFactory {
decorator.decorate(spec.getWorkerService());
decorator.decorate(spec.getMasterStatefulSet());
decorator.decorate(spec.getWorkerStatefulSet());
+ decorator.decorate(spec.getWorkerNetworkPolicy());
if (spec.getHorizontalPodAutoscaler().isPresent()) {
decorator.decorate(spec.getHorizontalPodAutoscaler().get());
}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/ClusterInitStep.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/ClusterInitStep.java
index 443760c..b3b38c4 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/ClusterInitStep.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/ClusterInitStep.java
@@ -31,6 +31,7 @@ import java.time.Instant;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.StatefulSet;
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.k8s.operator.SparkCluster;
@@ -75,6 +76,8 @@ public class ClusterInitStep extends ClusterReconcileStep {
context.getClient().apps().statefulSets().resource(masterStatefulSet).create();
StatefulSet workerStatefulSet = context.getWorkerStatefulSetSpec();
context.getClient().apps().statefulSets().resource(workerStatefulSet).create();
+ NetworkPolicy workerNetworkPolicy = context.getWorkerNetworkPolicySpec();
+
context.getClient().network().networkPolicies().resource(workerNetworkPolicy).create();
var horizontalPodAutoscaler = context.getHorizontalPodAutoscalerSpec();
if (horizontalPodAutoscaler.isPresent()) {
context
diff --git
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
index 8962aaf..57bbb73 100644
---
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
+++
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java
@@ -38,6 +38,8 @@ import
io.fabric8.kubernetes.api.model.autoscaling.v2.HorizontalPodAutoscalerBui
import
io.fabric8.kubernetes.api.model.autoscaling.v2.HorizontalPodAutoscalerSpec;
import
io.fabric8.kubernetes.api.model.autoscaling.v2.HorizontalPodAutoscalerSpecBuilder;
import io.fabric8.kubernetes.api.model.autoscaling.v2.MetricSpecBuilder;
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyBuilder;
import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget;
import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudgetBuilder;
import lombok.Getter;
@@ -54,6 +56,7 @@ public class SparkClusterResourceSpec {
@Getter private final Service workerService;
@Getter private final StatefulSet masterStatefulSet;
@Getter private final StatefulSet workerStatefulSet;
+ @Getter private final NetworkPolicy workerNetworkPolicy;
@Getter private final Optional<HorizontalPodAutoscaler>
horizontalPodAutoscaler;
@Getter private final Optional<PodDisruptionBudget> podDisruptionBudget;
@@ -114,6 +117,7 @@ public class SparkClusterResourceSpec {
workerSpec.getStatefulSetSpec());
horizontalPodAutoscaler = buildHorizontalPodAutoscaler(clusterName,
namespace, spec);
podDisruptionBudget = buildPodDisruptionBudget(clusterName, namespace,
spec);
+ workerNetworkPolicy = buildWorkerNetworkPolicy(clusterName, namespace);
}
/**
@@ -456,4 +460,34 @@ public class SparkClusterResourceSpec {
.endSpec()
.build());
}
+
+ /**
+ * Builds the NetworkPolicy for the SparkCluster.
+ *
+ * @param clusterName The name of the SparkCluster.
+ * @param namespace The namespace of the SparkApplication.
+ * @return A NetworkPolicy object.
+ */
+ private NetworkPolicy buildWorkerNetworkPolicy(String clusterName, String
namespace) {
+ return new NetworkPolicyBuilder()
+ .withNewMetadata()
+ .withName(clusterName + "-worker")
+ .withNamespace(namespace)
+ .addToLabels(LABEL_SPARK_CLUSTER_NAME, clusterName)
+ .endMetadata()
+ .withNewSpec()
+ .withNewPodSelector()
+ .addToMatchLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
+ .addToMatchLabels(LABEL_SPARK_CLUSTER_NAME, clusterName)
+ .endPodSelector()
+ .addNewIngress()
+ .addNewFrom()
+ .withNewPodSelector()
+ .addToMatchLabels(LABEL_SPARK_CLUSTER_NAME, clusterName)
+ .endPodSelector()
+ .endFrom()
+ .endIngress()
+ .endSpec()
+ .build();
+ }
}
diff --git
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
index 675365c..63c4171 100644
---
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
+++
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkClusterResourceSpecTest.java
@@ -36,6 +36,7 @@ import io.fabric8.kubernetes.api.model.ServiceSpecBuilder;
import io.fabric8.kubernetes.api.model.apps.StatefulSet;
import io.fabric8.kubernetes.api.model.apps.StatefulSetSpec;
import io.fabric8.kubernetes.api.model.apps.StatefulSetSpecBuilder;
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -316,6 +317,22 @@ class SparkClusterResourceSpecTest {
.get(LABEL_SPARK_VERSION_NAME));
}
+ @Test
+ void testWorkerNetworkPolicy() {
+ SparkClusterResourceSpec spec = new SparkClusterResourceSpec(cluster, new
SparkConf());
+ NetworkPolicy policy = spec.getWorkerNetworkPolicy();
+ assertEquals("my-namespace", policy.getMetadata().getNamespace());
+ assertEquals("cluster-name-worker", policy.getMetadata().getName());
+ var expected = Map.of(
+ LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE,
+ LABEL_SPARK_CLUSTER_NAME, "cluster-name");
+ assertEquals(expected, policy.getSpec().getPodSelector().getMatchLabels());
+ assertTrue(policy.getSpec().getEgress().isEmpty());
+ assertEquals(1, policy.getSpec().getIngress().size());
+ assertEquals(Map.of(LABEL_SPARK_CLUSTER_NAME, "cluster-name"),
+
policy.getSpec().getIngress().get(0).getFrom().get(0).getPodSelector().getMatchLabels());
+ }
+
@Test
void testEmptyHorizontalPodAutoscalerByDefault() {
SparkClusterResourceSpec spec = new SparkClusterResourceSpec(cluster, new
SparkConf());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]