[
https://issues.apache.org/jira/browse/AIRFLOW-2854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16596042#comment-16596042
]
ASF GitHub Bot commented on AIRFLOW-2854:
-----------------------------------------
Fokko closed pull request #3697: [AIRFLOW-2854] kubernetes_pod_operator add
more configuration items
URL: https://github.com/apache/incubator-airflow/pull/3697
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 27e0ebd29c..97bcdf2abc 100644
---
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -175,6 +175,11 @@ def extract_service_account_name(pod, req):
if pod.service_account_name:
req['spec']['serviceAccountName'] = pod.service_account_name
+ @staticmethod
+ def extract_hostnetwork(pod, req):
+ if pod.hostnetwork:
+ req['spec']['hostNetwork'] = pod.hostnetwork
+
@staticmethod
def extract_image_pull_secrets(pod, req):
if pod.image_pull_secrets:
diff --git
a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
index 95d6c829de..877d7aafe2 100644
---
a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
+++
b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py
@@ -59,6 +59,7 @@ def create(self, pod):
self.extract_image_pull_secrets(pod, req)
self.extract_annotations(pod, req)
self.extract_affinity(pod, req)
+ self.extract_hostnetwork(pod, req)
return req
@@ -116,4 +117,5 @@ def create(self, pod):
self.extract_image_pull_secrets(pod, req)
self.extract_annotations(pod, req)
self.extract_affinity(pod, req)
+ self.extract_hostnetwork(pod, req)
return req
diff --git a/airflow/contrib/kubernetes/pod.py
b/airflow/contrib/kubernetes/pod.py
index 6fcf354459..221c8f4180 100644
--- a/airflow/contrib/kubernetes/pod.py
+++ b/airflow/contrib/kubernetes/pod.py
@@ -77,7 +77,8 @@ def __init__(
service_account_name=None,
resources=None,
annotations=None,
- affinity=None
+ affinity=None,
+ hostnetwork=False
):
self.image = image
self.envs = envs or {}
@@ -98,3 +99,4 @@ def __init__(
self.resources = resources or Resources()
self.annotations = annotations or {}
self.affinity = affinity or {}
+ self.hostnetwork = hostnetwork or False
diff --git a/airflow/contrib/kubernetes/pod_launcher.py
b/airflow/contrib/kubernetes/pod_launcher.py
index 42f2bfea8a..8c8d949107 100644
--- a/airflow/contrib/kubernetes/pod_launcher.py
+++ b/airflow/contrib/kubernetes/pod_launcher.py
@@ -22,7 +22,7 @@
from datetime import datetime as dt
from airflow.contrib.kubernetes.kubernetes_request_factory import \
pod_request_factory as pod_factory
-from kubernetes import watch
+from kubernetes import watch, client
from kubernetes.client.rest import ApiException
from kubernetes.stream import stream as kubernetes_stream
from airflow import AirflowException
@@ -59,6 +59,15 @@ def run_pod_async(self, pod):
raise
return resp
+ def delete_pod(self, pod):
+ try:
+ self._client.delete_namespaced_pod(
+ pod.name, pod.namespace, body=client.V1DeleteOptions())
+ except ApiException as e:
+ # If the pod is already deleted
+ if e.status != 404:
+ raise
+
def run_pod(self, pod, startup_timeout=120, get_logs=True):
# type: (Pod) -> (State, result)
"""
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py
b/airflow/contrib/operators/kubernetes_pod_operator.py
index fb905622d8..bb4bf7fca1 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -102,6 +102,7 @@ def execute(self, context):
labels=self.labels,
)
+ pod.service_account_name = self.service_account_name
pod.secrets = self.secrets
pod.envs = self.env_vars
pod.image_pull_policy = self.image_pull_policy
@@ -109,6 +110,7 @@ def execute(self, context):
pod.resources = self.resources
pod.affinity = self.affinity
pod.node_selectors = self.node_selectors
+ pod.hostnetwork = self.hostnetwork
launcher = pod_launcher.PodLauncher(kube_client=client,
extract_xcom=self.xcom_push)
@@ -116,6 +118,10 @@ def execute(self, context):
pod,
startup_timeout=self.startup_timeout_seconds,
get_logs=self.get_logs)
+
+ if self.is_delete_operator_pod:
+ launcher.delete_pod(pod)
+
if final_state != State.SUCCESS:
raise AirflowException(
'Pod returned a failure: {state}'.format(state=final_state)
@@ -148,6 +154,10 @@ def __init__(self,
config_file=None,
xcom_push=False,
node_selectors=None,
+ image_pull_secrets=None,
+ service_account_name="default",
+ is_delete_operator_pod=False,
+ hostnetwork=False,
*args,
**kwargs):
super(KubernetesPodOperator, self).__init__(*args, **kwargs)
@@ -172,3 +182,7 @@ def __init__(self,
self.xcom_push = xcom_push
self.resources = resources or Resources()
self.config_file = config_file
+ self.image_pull_secrets = image_pull_secrets
+ self.service_account_name = service_account_name
+ self.is_delete_operator_pod = is_delete_operator_pod
+ self.hostnetwork = hostnetwork
diff --git a/airflow/example_dags/example_kubernetes_operator.py
b/airflow/example_dags/example_kubernetes_operator.py
index e8d35c4c5b..4b3f54bd04 100644
--- a/airflow/example_dags/example_kubernetes_operator.py
+++ b/airflow/example_dags/example_kubernetes_operator.py
@@ -48,7 +48,8 @@
in_cluster=False,
task_id="task",
get_logs=True,
- dag=dag)
+ dag=dag,
+ is_delete_operator_pod=False)
except ImportError as e:
log.warn("Could not import KubernetesPodOperator: " + str(e))
diff --git a/docs/kubernetes.rst b/docs/kubernetes.rst
index a4916858fc..4f6eeb14b2 100644
--- a/docs/kubernetes.rst
+++ b/docs/kubernetes.rst
@@ -91,7 +91,9 @@ Kubernetes Operator
volume_mounts=[volume_mount]
name="test",
task_id="task",
- affinity=affinity
+ affinity=affinity,
+ is_delete_operator_pod=True,
+ hostnetwork=False
)
diff --git a/tests/contrib/minikube/test_kubernetes_pod_operator.py
b/tests/contrib/minikube/test_kubernetes_pod_operator.py
index 5cb02d1ff1..595d7aa8b4 100644
--- a/tests/contrib/minikube/test_kubernetes_pod_operator.py
+++ b/tests/contrib/minikube/test_kubernetes_pod_operator.py
@@ -20,6 +20,7 @@
import shutil
from airflow.contrib.operators.kubernetes_pod_operator import
KubernetesPodOperator
from airflow import AirflowException
+from kubernetes.client.rest import ApiException
from subprocess import check_call
import mock
import json
@@ -93,6 +94,34 @@ def test_working_pod():
)
k.execute(None)
+ @staticmethod
+ def test_delete_operator_pod():
+ k = KubernetesPodOperator(
+ namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo 10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task",
+ is_delete_operator_pod=True
+ )
+ k.execute(None)
+
+ @staticmethod
+ def test_pod_hostnetwork():
+ k = KubernetesPodOperator(
+ namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo 10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task",
+ hostnetwork=True
+ )
+ k.execute(None)
+
@staticmethod
def test_pod_node_selectors():
node_selectors = {
@@ -200,10 +229,24 @@ def test_faulty_image(self):
task_id="task",
startup_timeout_seconds=5
)
- with self.assertRaises(AirflowException) as cm:
- k.execute(None),
+ with self.assertRaises(AirflowException):
+ k.execute(None)
- print("exception: {}".format(cm))
+ def test_faulty_service_account(self):
+ bad_service_account_name = "foobar"
+ k = KubernetesPodOperator(
+ namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo 10"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id="task",
+ startup_timeout_seconds=5,
+ service_account_name=bad_service_account_name
+ )
+ with self.assertRaises(ApiException):
+ k.execute(None)
def test_pod_failure(self):
"""
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> kubernetes_pod_operator add more configuration items
> ----------------------------------------------------
>
> Key: AIRFLOW-2854
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2854
> Project: Apache Airflow
> Issue Type: Improvement
> Components: contrib
> Affects Versions: 2.0.0
> Reporter: pengchen
> Assignee: pengchen
> Priority: Minor
> Fix For: 2.0.0
>
>
> kubernetes_pod_operator is missing kubernetes pods related configuration
> items, as follows:
> 1. image_pull_secrets
> _Pull secrets_ are used to _pull_ private container _images_ from registries.
> In this case, we need to configure the image_pull_secrets in pod spec file
> 2. service_account_name
> When kubernetes is running on rbac Authorization. If it is a job that needs
> to operate on kubernetes resources, we need to configure service account.
> 3. is_delete_operator_pod
> This option can be given to the user to decide whether to delete the job pod
> created by pod_operator, which is currently not processed.
> 4. hostnetwork
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)