leoblanc opened a new issue #11281:
URL: https://github.com/apache/airflow/issues/11281


   **Apache Airflow version**:
   
   apache/airflow:1.10.12-python3.6
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
   
   EKS 1.17
   
   **Environment**:
   
   I'm using the official Airflow Helm Chart 
(https://github.com/helm/charts/tree/master/stable/airflow)
   
   - **Cloud provider or hardware configuration**: AWS (EKS)
   
   **What happened**:
   
   PodOperator is not working when I use volumes. These are the logs:
   
   ```
   *** Log file does not exist: 
/opt/airflow/logs/company_podoperator_308959/passing-task/2020-10-02T14:16:28.462953+00:00/1.log
   *** Fetching from: 
http://airflow-worker-0.airflow-worker.airflow.svc.cluster.local:8793/log/company_podoperator_308959/passing-task/2020-10-02T14:16:28.462953+00:00/1.log
   
   [2020-10-02 14:16:46,331] {taskinstance.py:670} INFO - Dependencies all met 
for <TaskInstance: company_podoperator_308959.passing-task 
2020-10-02T14:16:28.462953+00:00 [None]>
   [2020-10-02 14:16:46,341] {taskinstance.py:670} INFO - Dependencies all met 
for <TaskInstance: company_podoperator_308959.passing-task 
2020-10-02T14:16:28.462953+00:00 [None]>
   [2020-10-02 14:16:46,342] {taskinstance.py:880} INFO - 
   
--------------------------------------------------------------------------------
   [2020-10-02 14:16:46,342] {taskinstance.py:881} INFO - Starting attempt 1 of 
2
   [2020-10-02 14:16:46,342] {taskinstance.py:882} INFO - 
   
--------------------------------------------------------------------------------
   [2020-10-02 14:16:46,357] {taskinstance.py:901} INFO - Executing 
<Task(KubernetesPodOperator): passing-task> on 2020-10-02T14:16:28.462953+00:00
   [2020-10-02 14:16:46,363] {standard_task_runner.py:54} INFO - Started 
process 53 to run task
   [2020-10-02 14:16:46,397] {standard_task_runner.py:77} INFO - Running: 
['airflow', 'run', 'company_podoperator_308959', 'passing-task', 
'2020-10-02T14:16:28.462953+00:00', '--job_id', '1363', '--pool', 
'default_pool', '--raw', '-sd', 'DAGS_FOLDER/podoperator-company.py', 
'--cfg_path', '/tmp/tmpzx8i4_rj']
   [2020-10-02 14:16:46,399] {standard_task_runner.py:78} INFO - Job 1363: 
Subtask passing-task
   [2020-10-02 14:16:46,523] {logging_mixin.py:112} INFO - Running %s on host 
%s <TaskInstance: company_podoperator_308959.passing-task 
2020-10-02T14:16:28.462953+00:00 [running]> 
airflow-worker-0.airflow-worker.airflow.svc.cluster.local
   [2020-10-02 14:16:46,599] {taskinstance.py:1150} ERROR - 'V1VolumeMount' 
object has no attribute 'attach_to_pod'
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py",
 line 979, in _run_raw_task
       result = task_copy.execute(context=context)
     File 
"/home/airflow/.local/lib/python3.6/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py",
 line 284, in execute
       final_state, _, result = self.create_new_pod_for_operator(labels, 
launcher)
     File 
"/home/airflow/.local/lib/python3.6/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py",
 line 394, in create_new_pod_for_operator
       self.volume_mounts  # type: ignore
     File 
"/home/airflow/.local/lib/python3.6/site-packages/airflow/kubernetes/k8s_model.py",
 line 77, in append_to_pod
       new_pod = reduce(lambda p, o: o.attach_to_pod(p), k8s_objects, pod)
     File 
"/home/airflow/.local/lib/python3.6/site-packages/airflow/kubernetes/k8s_model.py",
 line 77, in <lambda>
       new_pod = reduce(lambda p, o: o.attach_to_pod(p), k8s_objects, pod)
   AttributeError: 'V1VolumeMount' object has no attribute 'attach_to_pod'
   [2020-10-02 14:16:46,601] {taskinstance.py:1194} INFO - Marking task as 
UP_FOR_RETRY. dag_id=company_podoperator_308959, task_id=passing-task, 
execution_date=20201002T141628, start_date=20201002T141646, 
end_date=20201002T141646
   [2020-10-02 14:16:51,348] {local_task_job.py:102} INFO - Task exited with 
return code 1
   ```
   
   The main error is:
   
   ```
   AttributeError: 'V1VolumeMount' object has no attribute 'attach_to_pod'
   ```
   
   **What you expected to happen**:
   
   PodOperator working. I expect to see no errors in the logs.
   
   **How to reproduce it**:
   
   Create a DAG like this one:
   
   ```
   from airflow import DAG
   from datetime import datetime, timedelta
   from airflow.contrib.operators.kubernetes_pod_operator import 
KubernetesPodOperator
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.contrib.kubernetes.secret import Secret
   from kubernetes.client import models as k8s
   
   TASKS_TIMEOUT = timedelta(hours=1)
   
   default_args = {
       'owner': 'airflow',
       'depends_on_past': False,
       'start_date': datetime.utcnow(),
       'email': ['[email protected]'],
       'email_on_failure': False,
       'email_on_retry': False,
       'retries': 1,
       'retry_delay': timedelta(minutes=5)
   }
   
   dag = DAG(
       'company_podoperator_308959', default_args=default_args, 
schedule_interval=timedelta(minutes=10))
   
   start = DummyOperator(task_id='run_this_first', dag=dag)
   
   # Volumes to use in the init container (2 configmaps and an empty dir for 
the generated file)
   # 
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Volume.md
   init_container_volumes = [
       k8s.V1Volume(name='consul-commonini-config', 
config_map=k8s.V1ConfigMapVolumeSource(name='consul-commonini-config')),
       k8s.V1Volume(name='consul-commonini-data', 
config_map=k8s.V1ConfigMapVolumeSource(name='consul-commonini-data')),
       k8s.V1Volume(name='common-ini-generated', 
empty_dir=k8s.V1EmptyDirVolumeSource()),
   ]
   
   # Volume mounts for init container
   # 
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1VolumeMount.md
   init_container_volume_mounts = [
       k8s.V1VolumeMount(name='consul-commonini-config', 
mount_path='/consul-template/config/config.hcl', sub_path='config.hcl', 
read_only=True),
       k8s.V1VolumeMount(name='consul-commonini-data', 
mount_path='/consul-template/data/common.ini.ctmpl', 
sub_path='common.ini.ctmpl', read_only=True),
       k8s.V1VolumeMount(name='common-ini-generated', 
mount_path='/var/lib/secrets/common-ini', sub_path=None, read_only=False),
   ]
   
   # Init container with volume mounts
   # 
https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Container.md
   init_container = k8s.V1Container(
       name="common-ini",
       image="hashicorp/consul-template:latest", # 
https://hub.docker.com/r/hashicorp/consul-template/tags
       volume_mounts = init_container_volume_mounts,
   )
   
   # Volume mounts for airflow container (mounting the emptydir volume defined 
above)
   airflow_volume_mounts = [
       k8s.V1VolumeMount(name='common-ini-generated', 
mount_path='/var/lib/secrets/common-ini', sub_path=None, read_only=False),
   ]
   
   # Please note that we are using the cpr.py entrypoint script and providing 
the company id (cid) using the '-c' parameter
   # For details see 
https://airflow.apache.org/docs/stable/_api/airflow/contrib/operators/kubernetes_pod_operator/index.html
   cid = 308959
   passing = KubernetesPodOperator(
                dag=dag,
                name=f"passing-test",
                task_id='passing-task',
                namespace='airflow',
                
image="541301572502.dkr.ecr.us-west-2.amazonaws.com/airflow:v6.2",
                cmds=["python3"],
                
arguments=["/opt/bg/analytics/src/python3/reporting/etl/scripts/cpr.py", "-c", 
str(cid)],
                volume_mounts = airflow_volume_mounts,
                get_logs=True,
                execution_timeout=TASKS_TIMEOUT,
             )
   
   failing = KubernetesPodOperator(namespace='airflow',
                             image="ubuntu:1604",
                             cmds=["python","-c"],
                             arguments=["print('this failed')"],
                             labels={"foo": "bar"},
                             name="fail",
                             task_id="failing-task",
                             get_logs=True,
                             dag=dag
                             )
   
   passing.set_upstream(start)
   failing.set_upstream(start)
   ```
   
   Thank you very much in advance!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to