zhangxiao696 opened a new issue #12980:
URL: https://github.com/apache/airflow/issues/12980
when set dag use KubernetesExecutor with:
dag = DAG(
"example_using_k8s_executor_new",
schedule_interval="0 1 * * *",
catchup=False,
default_args={
"owner": "zhangxiao",
"depends_on_past": False,
"start_date": datetime(2020, 12, 5),
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(seconds=30),
"sla": timedelta(hours=23),
'executor_config': {
'KubernetesExecutor': {
'request_cpu': "200m",
'limit_cpu': "200m",
'request_memory': "500Mi",
'limit_memory': "500Mi"
}
}
},
)
the program raise exception:
[2020-12-10 10:02:08,142] {scheduler_job.py:1384} ERROR - Exception when
executing execute_helper
Traceback (most recent call last):
File
"/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py",
line 1382, in _execute
self._execute_helper()
File
"/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py",
line 1453, in _execute_helper
if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
File
"/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py",
line 1515, in _validate_and_run_task_instances
self.executor.heartbeat()
File
"/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/executors/base_executor.py",
line 130, in heartbeat
self.trigger_tasks(open_slots)
File
"/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/executors/base_executor.py",
line 155, in trigger_tasks
executor_config=simple_ti.executor_config)
File
"/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py",
line 803, in execute_async
self.task_queue.put((key, command, kube_executor_config))
File "", line 2, in put
File "/home/aicv/.jumbo/lib/python3.6/multiprocessing/managers.py", line
756, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/home/aicv/.jumbo/lib/python3.6/multiprocessing/connection.py", line
206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/home/aicv/.jumbo/lib/python3.6/multiprocessing/reduction.py", line
51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot serialize '_io.TextIOWrapper' object
But, if I remove executor_config in default_args of Dag, the program is
working!!!
dag = DAG(
"example_using_k8s_executor_new",
schedule_interval="0 1 * * *",
catchup=False,
default_args={
"owner": "zhangxiao",
"depends_on_past": False,
"start_date": datetime(2020, 12, 5),
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(seconds=30),
"sla": timedelta(hours=23),
)
why???????
please help me!!!!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]