SadmiB opened a new issue, #28879:
URL: https://github.com/apache/airflow/issues/28879
### Apache Airflow version
2.5.0
### What happened
I'm running airflow in Kubernetes, I schedule PythonOperator using
Kubernetes executor to download a dataset from s3 and I'm getting this error:
```
[2023-01-12 07:42:14,502] {dagbag.py:538} INFO - Filling up the DagBag from
/opt/airflow/dags/recognizer_pipeline.py
[2023-01-12 07:42:18,298] {font_manager.py:1633} INFO - generated new
fontManager
Downloading https://ultralytics.com/assets/Arial.ttf to
/home/airflow/.config/Ultralytics/Arial.ttf...
[2023-01-12 07:42:19,779] {s3.py:48} INFO - Initializing S3 client
[2023-01-12 07:42:19,800] {credentials.py:1108} INFO - Found credentials
from IAM Role: high_load-eks-node-group-20220721173708602000000002
[2023-01-12 07:42:19,904] {s3.py:48} INFO - Initializing S3 client
[2023-01-12 07:42:21,855] {task_command.py:389} INFO - Running
<TaskInstance: recognizer-pipeline.download_dataset.download_raw_weights
manual__2023-01-12T07:41:48.392745+00:00 [queued]> on host
recognizer-pipeline-download-d-3765e820b7594e71988db6c28c48ae66
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/__main__.py", line
39, in main
args.func(args)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py",
line 52, in command
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line
108, in wrapper
return f(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py",
line 396, in task_run
_run_task_by_selected_method(args, dag, ti)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py",
line 193, in _run_task_by_selected_method
_run_task_by_local_task_job(args, ti)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py",
line 252, in _run_task_by_local_task_job
run_job.run()
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py",
line 247, in run
self._execute()
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py",
line 132, in _execute
self.handle_task_exit(return_code)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py",
line 164, in handle_task_exit
self.task_instance.schedule_downstream_tasks()
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py",
line 75, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
line 2562, in schedule_downstream_tasks
partial_dag = task.dag.partial_subset(
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line
2201, in partial_subset
dag.task_dict = {
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line
2202, in <dictcomp>
t.task_id: _deepcopy_task(t)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line
2199, in _deepcopy_task
return copy.deepcopy(t, memo)
File "/usr/local/lib/python3.8/copy.py", line 153, in deepcopy
y = copier(memo)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py",
line 1166, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.8/copy.py", line 270, in _reconstruct
state = deepcopy(state, memo)
File "/usr/local/lib/python3.8/copy.py", line 146, in deepcopy
y = copier(x, memo)
File "/usr/local/lib/python3.8/copy.py", line 230, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/local/lib/python3.8/copy.py", line 161, in deepcopy
rv = reductor(4)
TypeError: cannot pickle 'module' object
```
and when I check the logs of the task in the web server I find this:
```
*** Reading local file:
/opt/airflow/logs/dag_id=recognizer-pipeline/run_id=manual__2023-01-12T07:41:48.392745+00:00/task_id=download_dataset.download_platesmania_test_dataset/attempt=1.log
[2023-01-12, 07:42:19 UTC] {taskinstance.py:1087} INFO - Dependencies all
met for <TaskInstance:
recognizer-pipeline.download_dataset.download_platesmania_test_dataset
manual__2023-01-12T07:41:48.392745+00:00 [queued]>
[2023-01-12, 07:42:19 UTC] {taskinstance.py:1087} INFO - Dependencies all
met for <TaskInstance:
recognizer-pipeline.download_dataset.download_platesmania_test_dataset
manual__2023-01-12T07:41:48.392745+00:00 [queued]>
[2023-01-12, 07:42:19 UTC] {taskinstance.py:1283} INFO -
--------------------------------------------------------------------------------
[2023-01-12, 07:42:19 UTC] {taskinstance.py:1284} INFO - Starting attempt 1
of 2
[2023-01-12, 07:42:19 UTC] {taskinstance.py:1285} INFO -
--------------------------------------------------------------------------------
[2023-01-12, 07:42:19 UTC] {taskinstance.py:1304} INFO - Executing
<Task(PythonOperator): download_dataset.download_platesmania_test_dataset> on
2023-01-12 07:41:48.392745+00:00
[2023-01-12, 07:42:19 UTC] {standard_task_runner.py:55} INFO - Started
process 26 to run task
[2023-01-12, 07:42:19 UTC] {standard_task_runner.py:82} INFO - Running:
['airflow', 'tasks', 'run', 'recognizer-pipeline',
'download_dataset.download_platesmania_test_dataset',
'manual__2023-01-12T07:41:48.392745+00:00', '--job-id', '57', '--raw',
'--subdir', 'DAGS_FOLDER/recognizer_pipeline.py', '--cfg-path',
'/tmp/tmp4pmu878r']
[2023-01-12, 07:42:19 UTC] {standard_task_runner.py:83} INFO - Job 57:
Subtask download_dataset.download_platesmania_test_dataset
[2023-01-12, 07:42:19 UTC] {task_command.py:389} INFO - Running
<TaskInstance:
recognizer-pipeline.download_dataset.download_platesmania_test_dataset
manual__2023-01-12T07:41:48.392745+00:00 [running]> on host
recognizer-pipeline-download-d-ec7a86c95d6c442ca1e719f26129dd7e
[2023-01-12, 07:42:20 UTC] {pod_generator.py:424} WARNING - Model file does
not exist
[2023-01-12, 07:42:20 UTC] {taskinstance.py:1772} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
line 1378, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
line 1504, in _execute_task_with_callbacks
airflow_context_vars = context_to_airflow_vars(context,
in_env_var_format=True)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/operator_helpers.py",
line 119, in context_to_airflow_vars
params[mapping_value] = ",".join(_attr)
TypeError: sequence item 0: expected str instance, NoneType found
[2023-01-12, 07:42:20 UTC] {taskinstance.py:1322} INFO - Marking task as
UP_FOR_RETRY. dag_id=recognizer-pipeline,
task_id=download_dataset.download_platesmania_test_dataset,
execution_date=20230112T074148, start_date=20230112T074219,
end_date=20230112T074220
[2023-01-12, 07:42:20 UTC] {standard_task_runner.py:100} ERROR - Failed to
execute job 57 for task download_dataset.download_platesmania_test_dataset
(sequence item 0: expected str instance, NoneType found; 26)
[2023-01-12, 07:42:20 UTC] {local_task_job.py:159} INFO - Task exited with
return code 1
```
### What you think should happen instead
The correct behavior is that the task runs as a Kubernetes pod and downloads
the data to a specific volume.
### How to reproduce
Airflow scheduler and webserver:
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
app: airflow
template:
metadata:
labels:
app: airflow
name: airflow
spec:
serviceAccountName: airflow-scheduler
containers:
- name: airflow-scheduler
image: ${TF_ECR_AIRFLOW}:latest
imagePullPolicy: Always
env:
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-staging-manual
key: SQLALCHEMY_DATABASE_URI
- name: AIRFLOW__CORE__EXECUTOR
value: KubernetesExecutor
- name: AIRFLOW__CORE__ENABLE_XCOM_PICKLING
value: "True"
- name: AIRFLOW__KUBERNETES_EXECUTOR__WORKER_CONTAINER_REPOSITORY
value: ${TF_ECR_AIRFLOW}
- name: AIRFLOW__KUBERNETES_EXECUTOR__WORKER_CONTAINER_TAG
value: latest
- name: ECR_AIRFLOW_GPU
valueFrom:
configMapKeyRef:
name: airflow-infrastructure
key: TF_ECR_AIRFLOW_GPU
- name: AWS_ACCESS_KEY_ID_S3
valueFrom:
configMapKeyRef:
name: airflow-infrastructure-k8s
key: IAM_AIRFLOW_ID
- name: AWS_SECRET_ACCESS_KEY_S3
valueFrom:
secretKeyRef:
name: airflow-staging-managed
key: IAM_SECRET
- name: AIRFLOW__KUBERNETES_EXECUTOR__NAMESPACE
value: airflow
- name: AIRFLOW__KUBERNETES_EXECUTOR__POD_TEMPLATE_FILE
value: "/opt/airflow/pod-creator.yml"
- name: AIRFLOW__KUBERNETES_EXECUTOR__DELETE_WORKER_PODS
value: "True"
- name:
AIRFLOW__KUBERNETES_EXECUTOR__DELETE_WORKER_PODS_ON_FAILURE
value: "True"
volumeMounts:
- name: airflow-logs
mountPath: /opt/airflow/logs
command:
- airflow
args:
- scheduler
- name: airflow-webserver
env:
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-staging-manual
key: SQLALCHEMY_DATABASE_URI
image: ${TF_ECR_AIRFLOW}:latest
imagePullPolicy: Always
ports:
- containerPort: 8080
command:
- airflow
args:
- webserver
volumeMounts:
- name: airflow-logs
mountPath: /opt/airflow/logs
restartPolicy: Always
securityContext:
runAsUser: 50000
runAsGroup: 0
fsGroup: 0
volumes:
- name: airflow-logs
persistentVolumeClaim:
claimName: airflow-logs
```
Pod creator template:
```yaml
apiVersion: v1
kind: Pod
metadata:
name: airflow-worker
namespace: airflow
spec:
containers:
- name: base
imagePullPolicy: Always
image: 958824886822.dkr.ecr.eu-central-1.amazonaws.com/airflow:latest
env:
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-staging-manual
key: SQLALCHEMY_DATABASE_URI
- name: AIRFLOW__CORE__EXECUTOR
value: "LocalExecutor"
- name: AIRFLOW__CORE__ENABLE_XCOM_PICKLING
value: "True"
- name: AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT
value: 60.0
- name: AIRFLOW__KUBERNETES_EXECUTOR__NAMESPACE
value: "airflow"
- name: AIRFLOW__CORE__DAGS_FOLDER
value: "/opt/airflow/dags"
volumeMounts:
- name: airflow-logs
mountPath: /opt/airflow/logs
- name: ml-training-data
mountPath: /opt/airflow/data
restartPolicy: Never
securityContext:
runAsUser: 50000
runAsGroup: 0
fsGroup: 0
serviceAccountName: "airflow-scheduler"
volumes:
- name: airflow-logs
persistentVolumeClaim:
claimName: airflow-logs
- name: ml-training-data
persistentVolumeClaim:
claimName: ml-training-data
```
Example Dag:
```python3
with DAG(
'detector-pipeline',
default_args=default_args,
description='End-to-end ML pipeline for detector',
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
tags=[],) as dag:
# task 1
with TaskGroup('download_dataset') as download_dataset:
PythonOperator(task_id=f'download_raw_weights',
python_callable=downloader.download_raw_weights)
PythonOperator(task_id=f'download_camera_train_dataset',
python_callable=downloader.download_folder,
op_kwargs={'data_dir': config.CAMERA_TRAIN_DIR})
PythonOperator(task_id=f'download_camera_test_dataset',
python_callable=downloader.download_folder,
op_kwargs={'data_dir': config.CAMERA_TEST_DIR})
PythonOperator(task_id=f'download_platesmania_train_dataset',
python_callable=downloader.download_folder,
op_kwargs={'data_dir': config.PLATESMANIA_TRAIN_DIR})
PythonOperator(task_id=f'download_platesmania_test_dataset',
python_callable=downloader.download_folder,
op_kwargs={'data_dir': config.PLATESMANIA_TEST_DIR})
PythonOperator(task_id=f'download_augmentation_dataset',
python_callable=downloader.download_folder,
op_kwargs={'data_dir': config.AUGMENTATION_DIR})
PythonOperator(task_id=f'download_problematic_dataset',
python_callable=downloader.download_folder,
op_kwargs={'data_dir': config.PROBLEMATIC_PLATES_DIR})
```
### Operating System
Debian GNU/Linux 11(bullseye)
### Versions of Apache Airflow Providers
```
apache-airflow-providers-amazon==6.2.0
apache-airflow-providers-celery==3.1.0
apache-airflow-providers-cncf-kubernetes==5.0.0
apache-airflow-providers-common-sql==1.3.1
apache-airflow-providers-docker==3.3.0
apache-airflow-providers-elasticsearch==4.3.1
apache-airflow-providers-ftp==3.2.0
apache-airflow-providers-google==8.6.0
apache-airflow-providers-grpc==3.1.0
apache-airflow-providers-hashicorp==3.2.0
apache-airflow-providers-http==4.1.0
apache-airflow-providers-imap==3.1.0
apache-airflow-providers-microsoft-azure==5.0.0
apache-airflow-providers-mysql==3.4.0
apache-airflow-providers-odbc==3.2.1
apache-airflow-providers-postgres==5.3.1
apache-airflow-providers-redis==3.1.0
apache-airflow-providers-sendgrid==3.1.0
apache-airflow-providers-sftp==4.2.0
apache-airflow-providers-slack==7.1.0
apache-airflow-providers-sqlite==3.3.1
apache-airflow-providers-ssh==3.3.0
```
### Deployment
Other
### Deployment details
_No response_
### Anything else
I faced the same issue with Airflow 2.3.2
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]