This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new 7ebd913 [AIRFLOW-3152] Kubernetes Pod Operator should support init
containers. (#6196)
7ebd913 is described below
commit 7ebd91380114d372fe0d114a2189905a4ad8cf70
Author: Marina Pereira <[email protected]>
AuthorDate: Tue Dec 17 00:14:24 2019 -0500
[AIRFLOW-3152] Kubernetes Pod Operator should support init containers.
(#6196)
* Add support for init-containers to Kubernetes Pod Operator
Enables start-up related code to be added for an app container in K8s Pod
operator.
Add new init_container resource that can be attached to the K8s Pod.
* Update init_container and fix tests
Fix the error in init_container and the associated tests.
* Refactor and fix tests
Fix tests for init_containers
* Fix init_container test errors
Remove unused mocks in init_container test
* Fix init_container test errors
Update volume mount object used in init_container test
* Fix init_container test errors
Add the missing volume setup for the init_container test.
* Fix init_container test failure.
Fix the expected result in the init_container test.
* Fix init_container test failures
Update expected results in the init_container tests
* Update the KubernetesPodOperator guide
Update the KubernetesPodOperator guide to document support for init
containers
* Update init-container tests
Fix test failures casued due python versions by sorting the output before
assert test.
* Update init-container to use k8s V1Container object
Remove custom object InitContainer.
Allow users to pass List[k8s.V1Container] as init-container in
K8sPodOperator
* Add missing init_containers initalization in K8s pod operator
Due to rebase from master, certain sections of the
kubernetes_pod_operator.py file was refactored which led to missing
init_containers initalization in K8s pod operator. Add missing init_containers
initalization in K8s pod operator. Update kubernetes pod operator
configurations in init container test.
(cherry picked from commit 4e1b0aa7c52e79eeb99698e6190e6f39ca3cab7f)
---
.../contrib/operators/kubernetes_pod_operator.py | 5 +-
docs/howto/operator/kubernetes.rst | 155 +++++++++++++++++++++
2 files changed, 159 insertions(+), 1 deletion(-)
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py
b/airflow/contrib/operators/kubernetes_pod_operator.py
index 41f0df3..e64a912 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -138,6 +138,8 @@ class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-
/airflow/xcom/return.json in the container will also be pushed to an
XCom when the container completes.
:type do_xcom_push: bool
+ :param init_containers: init container for the launched Pod
+ :type init_containers: list[kubernetes.client.models.V1Container]
:param pod_template_file: path to pod template file
:type pod_template_file: str
"""
@@ -178,11 +180,11 @@ class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-
dnspolicy=None,
schedulername=None,
full_pod_spec=None,
- init_containers=None,
log_events_on_failure=False,
do_xcom_push=False,
pod_template_file=None,
priority_class_name=None,
+ init_containers=None,
*args,
**kwargs):
if kwargs.get('xcom_push') is not None:
@@ -224,6 +226,7 @@ class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-
self.schedulername = schedulername
self.full_pod_spec = full_pod_spec
self.init_containers = init_containers or []
+ self.init_containers = init_containers or []
self.log_events_on_failure = log_events_on_failure
self.pod_template_file = pod_template_file
self.priority_class_name = priority_class_name
diff --git a/docs/howto/operator/kubernetes.rst
b/docs/howto/operator/kubernetes.rst
new file mode 100644
index 0000000..f612cce
--- /dev/null
+++ b/docs/howto/operator/kubernetes.rst
@@ -0,0 +1,155 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+
+.. _howto/operator:KubernetesPodOperator:
+
+Kubernetes Operator
+===================
+
+The
:class:`airflow.contrib.operators.kubernetes_pod_operator.KubernetesPodOperator`
allows you to create
+Pods on Kubernetes. It works with any type of executor.
+
+.. code:: python
+
+ import kubernetes.client.models as k8s
+
+ from airflow.contrib.operators.kubernetes_pod_operator import
KubernetesPodOperator
+ from airflow.contrib.kubernetes.secret import Secret
+ from airflow.contrib.kubernetes.volume import Volume
+ from airflow.contrib.kubernetes.volume_mount import VolumeMount
+ from airflow.contrib.kubernetes.pod import Port
+
+
+ secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets',
'sql_alchemy_conn')
+ secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets',
'sql_alchemy_conn')
+ secret_all_keys = Secret('env', None, 'airflow-secrets-2')
+ volume_mount = VolumeMount('test-volume',
+ mount_path='/root/mount_file',
+ sub_path=None,
+ read_only=True)
+ port = Port('http', 80)
+ configmaps = ['test-configmap-1', 'test-configmap-2']
+
+ volume_config= {
+ 'persistentVolumeClaim':
+ {
+ 'claimName': 'test-volume'
+ }
+ }
+ volume = Volume(name='test-volume', configs=volume_config)
+
+ init_container_volume_mounts = [k8s.V1VolumeMount(
+ mount_path='/etc/foo',
+ name='test-volume',
+ sub_path=None,
+ read_only=True
+ )]
+
+ init_environments = [k8s.V1EnvVar(
+ name='key1',
+ value='value1'
+ ), k8s.V1EnvVar(
+ name='key2',
+ value='value2'
+ )]
+
+ init_container = k8s.V1Container(
+ name="init-container",
+ image="ubuntu:16.04",
+ env=init_environments,
+ volume_mounts=init_container_volume_mounts,
+ command=["bash", "-cx"],
+ args=["echo 10"]
+ )
+
+ affinity = {
+ 'nodeAffinity': {
+ 'preferredDuringSchedulingIgnoredDuringExecution': [
+ {
+ "weight": 1,
+ "preference": {
+ "matchExpressions": {
+ "key": "disktype",
+ "operator": "In",
+ "values": ["ssd"]
+ }
+ }
+ }
+ ]
+ },
+ "podAffinity": {
+ "requiredDuringSchedulingIgnoredDuringExecution": [
+ {
+ "labelSelector": {
+ "matchExpressions": [
+ {
+ "key": "security",
+ "operator": "In",
+ "values": ["S1"]
+ }
+ ]
+ },
+ "topologyKey": "failure-domain.beta.kubernetes.io/zone"
+ }
+ ]
+ },
+ "podAntiAffinity": {
+ "requiredDuringSchedulingIgnoredDuringExecution": [
+ {
+ "labelSelector": {
+ "matchExpressions": [
+ {
+ "key": "security",
+ "operator": "In",
+ "values": ["S2"]
+ }
+ ]
+ },
+ "topologyKey": "kubernetes.io/hostname"
+ }
+ ]
+ }
+ }
+
+ tolerations = [
+ {
+ 'key': "key",
+ 'operator': 'Equal',
+ 'value': 'value'
+ }
+ ]
+
+ k = KubernetesPodOperator(namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["echo", "10"],
+ labels={"foo": "bar"},
+ secrets=[secret_file, secret_env,
secret_all_keys],
+ ports=[port]
+ volumes=[volume],
+ volume_mounts=[volume_mount],
+ name="test",
+ task_id="task",
+ affinity=affinity,
+ is_delete_operator_pod=True,
+ hostnetwork=False,
+ tolerations=tolerations,
+ configmaps=configmaps,
+ init_containers=[init_container],
+ )