SadmiB opened a new issue, #28879:
URL: https://github.com/apache/airflow/issues/28879

   ### Apache Airflow version
   
   2.5.0
   
   ### What happened
   
   I'm running airflow in Kubernetes, I schedule PythonOperator using 
Kubernetes executor to download a dataset from s3 and I'm getting this error:
   
   ```
   [2023-01-12 07:42:14,502] {dagbag.py:538} INFO - Filling up the DagBag from 
/opt/airflow/dags/recognizer_pipeline.py
   [2023-01-12 07:42:18,298] {font_manager.py:1633} INFO - generated new 
fontManager
   Downloading https://ultralytics.com/assets/Arial.ttf to 
/home/airflow/.config/Ultralytics/Arial.ttf...
   [2023-01-12 07:42:19,779] {s3.py:48} INFO - Initializing S3 client
   [2023-01-12 07:42:19,800] {credentials.py:1108} INFO - Found credentials 
from IAM Role: high_load-eks-node-group-20220721173708602000000002
   [2023-01-12 07:42:19,904] {s3.py:48} INFO - Initializing S3 client
   [2023-01-12 07:42:21,855] {task_command.py:389} INFO - Running 
<TaskInstance: recognizer-pipeline.download_dataset.download_raw_weights 
manual__2023-01-12T07:41:48.392745+00:00 [queued]> on host 
recognizer-pipeline-download-d-3765e820b7594e71988db6c28c48ae66
   Traceback (most recent call last):
     File "/home/airflow/.local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/__main__.py", line 
39, in main
       args.func(args)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", 
line 52, in command
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 
108, in wrapper
       return f(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py",
 line 396, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py",
 line 193, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py",
 line 252, in _run_task_by_local_task_job
       run_job.run()
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", 
line 247, in run
       self._execute()
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py",
 line 132, in _execute
       self.handle_task_exit(return_code)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py",
 line 164, in handle_task_exit
       self.task_instance.schedule_downstream_tasks()
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", 
line 75, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 2562, in schedule_downstream_tasks
       partial_dag = task.dag.partial_subset(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 
2201, in partial_subset
       dag.task_dict = {
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 
2202, in <dictcomp>
       t.task_id: _deepcopy_task(t)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 
2199, in _deepcopy_task
       return copy.deepcopy(t, memo)
     File "/usr/local/lib/python3.8/copy.py", line 153, in deepcopy
       y = copier(memo)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py",
 line 1166, in __deepcopy__
       setattr(result, k, copy.deepcopy(v, memo))
     File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
       y = copier(x, memo)
     File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
       y[deepcopy(key, memo)] = deepcopy(value, memo)
     File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy
       y = _reconstruct(x, memo, *rv)
     File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct
       state = deepcopy(state, memo)
     File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
       y = copier(x, memo)
     File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
       y[deepcopy(key, memo)] = deepcopy(value, memo)
     File "/usr/local/lib/python3.8/copy.py", line 161, in deepcopy
       rv = reductor(4)
   TypeError: cannot pickle 'module' object
   ```
   
   and when I check the logs of the task in the web server I find this:
   
   ```
   *** Reading local file: 
/opt/airflow/logs/dag_id=recognizer-pipeline/run_id=manual__2023-01-12T07:41:48.392745+00:00/task_id=download_dataset.download_platesmania_test_dataset/attempt=1.log
   [2023-01-12, 07:42:19 UTC] {taskinstance.py:1087} INFO - Dependencies all 
met for <TaskInstance: 
recognizer-pipeline.download_dataset.download_platesmania_test_dataset 
manual__2023-01-12T07:41:48.392745+00:00 [queued]>
   [2023-01-12, 07:42:19 UTC] {taskinstance.py:1087} INFO - Dependencies all 
met for <TaskInstance: 
recognizer-pipeline.download_dataset.download_platesmania_test_dataset 
manual__2023-01-12T07:41:48.392745+00:00 [queued]>
   [2023-01-12, 07:42:19 UTC] {taskinstance.py:1283} INFO - 
   
--------------------------------------------------------------------------------
   [2023-01-12, 07:42:19 UTC] {taskinstance.py:1284} INFO - Starting attempt 1 
of 2
   [2023-01-12, 07:42:19 UTC] {taskinstance.py:1285} INFO - 
   
--------------------------------------------------------------------------------
   [2023-01-12, 07:42:19 UTC] {taskinstance.py:1304} INFO - Executing 
<Task(PythonOperator): download_dataset.download_platesmania_test_dataset> on 
2023-01-12 07:41:48.392745+00:00
   [2023-01-12, 07:42:19 UTC] {standard_task_runner.py:55} INFO - Started 
process 26 to run task
   [2023-01-12, 07:42:19 UTC] {standard_task_runner.py:82} INFO - Running: 
['airflow', 'tasks', 'run', 'recognizer-pipeline', 
'download_dataset.download_platesmania_test_dataset', 
'manual__2023-01-12T07:41:48.392745+00:00', '--job-id', '57', '--raw', 
'--subdir', 'DAGS_FOLDER/recognizer_pipeline.py', '--cfg-path', 
'/tmp/tmp4pmu878r']
   [2023-01-12, 07:42:19 UTC] {standard_task_runner.py:83} INFO - Job 57: 
Subtask download_dataset.download_platesmania_test_dataset
   [2023-01-12, 07:42:19 UTC] {task_command.py:389} INFO - Running 
<TaskInstance: 
recognizer-pipeline.download_dataset.download_platesmania_test_dataset 
manual__2023-01-12T07:41:48.392745+00:00 [running]> on host 
recognizer-pipeline-download-d-ec7a86c95d6c442ca1e719f26129dd7e
   [2023-01-12, 07:42:20 UTC] {pod_generator.py:424} WARNING - Model file  does 
not exist
   [2023-01-12, 07:42:20 UTC] {taskinstance.py:1772} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 1378, in _run_raw_task
       self._execute_task_with_callbacks(context, test_mode)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 1504, in _execute_task_with_callbacks
       airflow_context_vars = context_to_airflow_vars(context, 
in_env_var_format=True)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/operator_helpers.py",
 line 119, in context_to_airflow_vars
       params[mapping_value] = ",".join(_attr)
   TypeError: sequence item 0: expected str instance, NoneType found
   [2023-01-12, 07:42:20 UTC] {taskinstance.py:1322} INFO - Marking task as 
UP_FOR_RETRY. dag_id=recognizer-pipeline, 
task_id=download_dataset.download_platesmania_test_dataset, 
execution_date=20230112T074148, start_date=20230112T074219, 
end_date=20230112T074220
   [2023-01-12, 07:42:20 UTC] {standard_task_runner.py:100} ERROR - Failed to 
execute job 57 for task download_dataset.download_platesmania_test_dataset 
(sequence item 0: expected str instance, NoneType found; 26)
   [2023-01-12, 07:42:20 UTC] {local_task_job.py:159} INFO - Task exited with 
return code 1
   ```
   
   ### What you think should happen instead
   
   The correct behavior is that the task runs as a Kubernetes pod and downloads 
the data to a specific volume.
   
   ### How to reproduce
   
   Airflow scheduler and webserver:
   ```yaml
   apiVersion: apps/v1
   kind: Deployment
   metadata:
     name: airflow
     namespace: airflow
   spec:
     replicas: 1
     selector:
       matchLabels:
         app: airflow
     template:
       metadata:
         labels:
           app: airflow
         name: airflow
       spec:
         serviceAccountName: airflow-scheduler
         containers:
           - name: airflow-scheduler
             image: ${TF_ECR_AIRFLOW}:latest
             imagePullPolicy: Always
             env:
               - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
                 valueFrom:
                   secretKeyRef:
                     name: airflow-staging-manual
                     key: SQLALCHEMY_DATABASE_URI
               - name: AIRFLOW__CORE__EXECUTOR
                 value: KubernetesExecutor
               - name: AIRFLOW__CORE__ENABLE_XCOM_PICKLING
                 value: "True"
               - name: AIRFLOW__KUBERNETES_EXECUTOR__WORKER_CONTAINER_REPOSITORY
                 value: ${TF_ECR_AIRFLOW}
               - name: AIRFLOW__KUBERNETES_EXECUTOR__WORKER_CONTAINER_TAG
                 value: latest
               - name: ECR_AIRFLOW_GPU
                 valueFrom:
                   configMapKeyRef:
                     name: airflow-infrastructure
                     key: TF_ECR_AIRFLOW_GPU
               - name: AWS_ACCESS_KEY_ID_S3
                 valueFrom:
                   configMapKeyRef:
                     name: airflow-infrastructure-k8s
                     key: IAM_AIRFLOW_ID
               - name: AWS_SECRET_ACCESS_KEY_S3
                 valueFrom:
                   secretKeyRef:
                     name: airflow-staging-managed
                     key: IAM_SECRET
               - name: AIRFLOW__KUBERNETES_EXECUTOR__NAMESPACE
                 value: airflow
               - name: AIRFLOW__KUBERNETES_EXECUTOR__POD_TEMPLATE_FILE
                 value: "/opt/airflow/pod-creator.yml"
               - name: AIRFLOW__KUBERNETES_EXECUTOR__DELETE_WORKER_PODS
                 value: "True"
               - name: 
AIRFLOW__KUBERNETES_EXECUTOR__DELETE_WORKER_PODS_ON_FAILURE
                 value: "True"
             volumeMounts:
               - name: airflow-logs
                 mountPath: /opt/airflow/logs
             command:
               - airflow
             args:
               - scheduler
           - name: airflow-webserver
             env:
               - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
                 valueFrom:
                   secretKeyRef:
                     name: airflow-staging-manual
                     key: SQLALCHEMY_DATABASE_URI
             image: ${TF_ECR_AIRFLOW}:latest
             imagePullPolicy: Always
             ports:
               - containerPort: 8080
             command:
               - airflow
             args:
               - webserver
             volumeMounts:
               - name: airflow-logs
                 mountPath: /opt/airflow/logs
         restartPolicy: Always
         securityContext:
           runAsUser: 50000
           runAsGroup: 0
           fsGroup: 0
         volumes:
           - name: airflow-logs
             persistentVolumeClaim:
               claimName: airflow-logs
   ```
   
   Pod creator template:
   
   ```yaml
   apiVersion: v1
   kind: Pod
   metadata:
     name: airflow-worker
     namespace: airflow
   spec:
     containers: 
       - name: base 
         imagePullPolicy: Always
         image: 958824886822.dkr.ecr.eu-central-1.amazonaws.com/airflow:latest
         env: 
         - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
           valueFrom:
             secretKeyRef:
               name: airflow-staging-manual
               key: SQLALCHEMY_DATABASE_URI
         - name: AIRFLOW__CORE__EXECUTOR
           value: "LocalExecutor"
         - name: AIRFLOW__CORE__ENABLE_XCOM_PICKLING
           value: "True"
         - name: AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT
           value: 60.0
         - name: AIRFLOW__KUBERNETES_EXECUTOR__NAMESPACE
           value: "airflow"
         - name: AIRFLOW__CORE__DAGS_FOLDER
           value: "/opt/airflow/dags"
         volumeMounts:
           - name: airflow-logs
             mountPath: /opt/airflow/logs
           - name: ml-training-data
             mountPath: /opt/airflow/data
     restartPolicy: Never
     securityContext:
       runAsUser: 50000
       runAsGroup: 0
       fsGroup: 0
     serviceAccountName: "airflow-scheduler"
     volumes:
       - name: airflow-logs
         persistentVolumeClaim:
           claimName: airflow-logs
       - name: ml-training-data
         persistentVolumeClaim:
           claimName: ml-training-data
    
   ```
   
   Example Dag:
   
   ```python3
   
   with DAG(
       'detector-pipeline',
       default_args=default_args,
       description='End-to-end ML pipeline for detector',
       schedule_interval=timedelta(days=1),
       start_date=days_ago(1),
       tags=[],) as dag:
       # task 1
       with TaskGroup('download_dataset') as download_dataset:
           PythonOperator(task_id=f'download_raw_weights',
                          python_callable=downloader.download_raw_weights)
   
           PythonOperator(task_id=f'download_camera_train_dataset',
                          python_callable=downloader.download_folder, 
                          op_kwargs={'data_dir': config.CAMERA_TRAIN_DIR})
   
           PythonOperator(task_id=f'download_camera_test_dataset',
                          python_callable=downloader.download_folder, 
                          op_kwargs={'data_dir': config.CAMERA_TEST_DIR})
           
           PythonOperator(task_id=f'download_platesmania_train_dataset',
                          python_callable=downloader.download_folder, 
                          op_kwargs={'data_dir': config.PLATESMANIA_TRAIN_DIR})
   
           PythonOperator(task_id=f'download_platesmania_test_dataset',
                          python_callable=downloader.download_folder, 
                          op_kwargs={'data_dir': config.PLATESMANIA_TEST_DIR})
           
           PythonOperator(task_id=f'download_augmentation_dataset',
                          python_callable=downloader.download_folder, 
                          op_kwargs={'data_dir': config.AUGMENTATION_DIR})
   
           PythonOperator(task_id=f'download_problematic_dataset',
                          python_callable=downloader.download_folder, 
                          op_kwargs={'data_dir': config.PROBLEMATIC_PLATES_DIR})
   ```
   
   ### Operating System
   
   Debian GNU/Linux 11(bullseye)
   
   ### Versions of Apache Airflow Providers
   
   ```
   apache-airflow-providers-amazon==6.2.0
   apache-airflow-providers-celery==3.1.0
   apache-airflow-providers-cncf-kubernetes==5.0.0
   apache-airflow-providers-common-sql==1.3.1
   apache-airflow-providers-docker==3.3.0
   apache-airflow-providers-elasticsearch==4.3.1
   apache-airflow-providers-ftp==3.2.0
   apache-airflow-providers-google==8.6.0
   apache-airflow-providers-grpc==3.1.0
   apache-airflow-providers-hashicorp==3.2.0
   apache-airflow-providers-http==4.1.0
   apache-airflow-providers-imap==3.1.0
   apache-airflow-providers-microsoft-azure==5.0.0
   apache-airflow-providers-mysql==3.4.0
   apache-airflow-providers-odbc==3.2.1
   apache-airflow-providers-postgres==5.3.1
   apache-airflow-providers-redis==3.1.0
   apache-airflow-providers-sendgrid==3.1.0
   apache-airflow-providers-sftp==4.2.0
   apache-airflow-providers-slack==7.1.0
   apache-airflow-providers-sqlite==3.3.1
   apache-airflow-providers-ssh==3.3.0
   ```
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   I faced the same issue with Airflow 2.3.2
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to