This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 5f2afca [SPARK-55085] Support `NetworkPolicy` for `SparkApplication`
5f2afca is described below
commit 5f2afca5454b4445865e81424313169dd0490e82
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat Jan 17 22:51:59 2026 +0900
[SPARK-55085] Support `NetworkPolicy` for `SparkApplication`
### What changes were proposed in this pull request?
This PR aims to support `NetworkPolicy` for `SparkApplication`.
### Why are the changes needed?
To enhance the security of `SparkApplication` executor pods.
`NetworkPolicy` is frequently used in the production to isolate Spark
Applications.
- https://kubernetes.io/docs/concepts/services-networking/network-policies/
### Does this PR introduce _any_ user-facing change?
The executor pods of `SparkApplication` will allow ingress from the same
`SparkApplication` driver or executors.
### How was this patch tested?
Pass the CIs with the newly added test case.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #465 from dongjoon-hyun/SPARK-55085.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../templates/operator-rbac.yaml | 1 +
.../spark/k8s/operator/SparkAppResourceSpec.java | 33 +++++++++++++
.../k8s/operator/SparkAppResourceSpecTest.java | 55 +++++++++++++++++++++-
3 files changed, 88 insertions(+), 1 deletion(-)
diff --git
a/build-tools/helm/spark-kubernetes-operator/templates/operator-rbac.yaml
b/build-tools/helm/spark-kubernetes-operator/templates/operator-rbac.yaml
index 55840c4..59d155b 100644
--- a/build-tools/helm/spark-kubernetes-operator/templates/operator-rbac.yaml
+++ b/build-tools/helm/spark-kubernetes-operator/templates/operator-rbac.yaml
@@ -50,6 +50,7 @@ rules:
- "networking.k8s.io"
resources:
- ingresses
+ - networkpolicies
verbs:
- '*'
- apiGroups:
diff --git
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java
index 51b305f..14644e4 100644
---
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java
+++
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java
@@ -29,6 +29,8 @@ import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyBuilder;
import lombok.Getter;
import org.apache.spark.deploy.k8s.Config;
@@ -96,6 +98,7 @@ public class SparkAppResourceSpec {
KubernetesClientUtils.buildConfigMapJava(
kubernetesDriverConf.configMapNameDriver(), confFilesMap,
Map.of()));
this.driverPreResources.addAll(ConfigMapSpecUtils.buildConfigMaps(configMapSpecs));
+
this.driverPreResources.add(buildNetworkPolicy(kubernetesDriverConf.appId(),
namespace));
this.driverResources.addAll(configureDriverServerIngress(sparkPod,
driverServiceIngressList));
this.driverPreResources.forEach(r -> setNamespaceIfMissing(r, namespace));
this.driverResources.forEach(r -> setNamespaceIfMissing(r, namespace));
@@ -165,4 +168,34 @@ public class SparkAppResourceSpec {
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
+
+ /**
+ * Builds the NetworkPolicy for the SparkApplication.
+ *
+ * @param appId The application ID of the SparkApplication.
+ * @param namespace The namespace of the SparkApplication.
+ * @return A NetworkPolicy object.
+ */
+ private NetworkPolicy buildNetworkPolicy(String appId, String namespace) {
+ return new NetworkPolicyBuilder()
+ .withNewMetadata()
+ .withName(appId + "-network-policy")
+ .withNamespace(namespace)
+
.addToLabels(org.apache.spark.k8s.operator.Constants.LABEL_SPARK_APPLICATION_NAME,
appId)
+ .endMetadata()
+ .withNewSpec()
+ .withNewPodSelector()
+ .addToMatchLabels(Constants.SPARK_ROLE_LABEL(),
Constants.SPARK_POD_EXECUTOR_ROLE())
+ .addToMatchLabels(Constants.SPARK_APP_ID_LABEL(), appId)
+ .endPodSelector()
+ .addNewIngress()
+ .addNewFrom()
+ .withNewPodSelector()
+ .addToMatchLabels(Constants.SPARK_APP_ID_LABEL(), appId)
+ .endPodSelector()
+ .endFrom()
+ .endIngress()
+ .endSpec()
+ .build();
+ }
}
diff --git
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java
index 0dfc019..2eee28d 100644
---
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java
+++
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java
@@ -36,6 +36,8 @@ import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Volume;
import io.fabric8.kubernetes.api.model.VolumeMount;
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy;
+import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicyBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -72,7 +74,7 @@ class SparkAppResourceSpecTest {
new SparkAppResourceSpec(mockConf, mockSpec, List.of(), List.of());
Assertions.assertEquals(2, appResourceSpec.getDriverResources().size());
- Assertions.assertEquals(1, appResourceSpec.getDriverPreResources().size());
+ Assertions.assertEquals(2, appResourceSpec.getDriverPreResources().size());
Assertions.assertEquals(Pod.class,
appResourceSpec.getDriverResources().get(0).getClass());
Assertions.assertEquals(
ConfigMap.class,
appResourceSpec.getDriverResources().get(1).getClass());
@@ -110,6 +112,57 @@ class SparkAppResourceSpecTest {
Assertions.assertEquals(proposedConfigVolume.getName(),
proposedConfigVolumeMount.getName());
}
+ @Test
+ void testNetworkPolicy() {
+ SparkAppDriverConf mockConf = mock(SparkAppDriverConf.class);
+ when(mockConf.appId()).thenReturn("app1");
+ when(mockConf.configMapNameDriver()).thenReturn("foo-network-policy");
+ when(mockConf.sparkConf())
+ .thenReturn(new SparkConf().set("spark.kubernetes.namespace",
"foo-namespace"));
+
+ KubernetesDriverSpec mockSpec = mock(KubernetesDriverSpec.class);
+ Pod driver = buildBasicPod("driver");
+ SparkPod sparkPod = new SparkPod(driver, buildBasicContainer());
+
+ // Add some mock resources and pre-resources
+ Seq<HasMetadata> empty =
CollectionConverters.asScala(List.<HasMetadata>of()).toList();
+ when(mockSpec.driverKubernetesResources()).thenReturn(empty);
+ when(mockSpec.driverPreKubernetesResources()).thenReturn(empty);
+ when(mockSpec.pod()).thenReturn(sparkPod);
+ when(mockSpec.systemProperties()).thenReturn(new HashMap<>());
+
+ SparkAppResourceSpec appResourceSpec =
+ new SparkAppResourceSpec(mockConf, mockSpec, List.of(), List.of());
+
+ Assertions.assertEquals(1, appResourceSpec.getDriverPreResources().size());
+ Assertions.assertEquals(NetworkPolicy.class,
+ appResourceSpec.getDriverPreResources().get(0).getClass());
+
+ NetworkPolicy expected = new NetworkPolicyBuilder()
+ .withNewMetadata()
+ .withName("app1-network-policy")
+ .withNamespace("foo-namespace")
+ .addToLabels(Constants.LABEL_SPARK_APPLICATION_NAME, "app1")
+ .endMetadata()
+ .withNewSpec()
+ .withNewPodSelector()
+ .addToMatchLabels(Constants.LABEL_SPARK_ROLE_NAME,
+ Constants.LABEL_SPARK_ROLE_EXECUTOR_VALUE)
+ .addToMatchLabels("spark-app-selector", "app1")
+ .endPodSelector()
+ .addNewIngress()
+ .addNewFrom()
+ .withNewPodSelector()
+ .addToMatchLabels("spark-app-selector", "app1")
+ .endPodSelector()
+ .endFrom()
+ .endIngress()
+ .endSpec()
+ .build();
+ Assertions.assertEquals(1, appResourceSpec.getDriverPreResources().size());
+ Assertions.assertEquals(expected,
appResourceSpec.getDriverPreResources().get(0));
+ }
+
protected Container buildBasicContainer() {
return new ContainerBuilder()
.withName("foo-container")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]