zhangxiao696 opened a new issue #12980:
URL: https://github.com/apache/airflow/issues/12980


   when set dag use KubernetesExecutor with:
   dag = DAG(
   "example_using_k8s_executor_new",
   schedule_interval="0 1 * * *",
   catchup=False,
   default_args={
   "owner": "zhangxiao",
   "depends_on_past": False,
   "start_date": datetime(2020, 12, 5),
   "email_on_failure": False,
   "email_on_retry": False,
   "retries": 2,
   "retry_delay": timedelta(seconds=30),
   "sla": timedelta(hours=23),
   'executor_config': {
   'KubernetesExecutor': {
   'request_cpu': "200m",
   'limit_cpu': "200m",
   'request_memory': "500Mi",
   'limit_memory': "500Mi"
   }
   }
   },
   )
   
   the program raise exception:
   [2020-12-10 10:02:08,142] {scheduler_job.py:1384} ERROR - Exception when 
executing execute_helper
   Traceback (most recent call last):
   File 
"/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", 
line 1382, in _execute
   self._execute_helper()
   File 
"/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", 
line 1453, in _execute_helper
   if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
   File 
"/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", 
line 1515, in _validate_and_run_task_instances
   self.executor.heartbeat()
   File 
"/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/executors/base_executor.py",
 line 130, in heartbeat
   self.trigger_tasks(open_slots)
   File 
"/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/executors/base_executor.py",
 line 155, in trigger_tasks
   executor_config=simple_ti.executor_config)
   File 
"/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py",
 line 803, in execute_async
   self.task_queue.put((key, command, kube_executor_config))
   File "", line 2, in put
   File "/home/aicv/.jumbo/lib/python3.6/multiprocessing/managers.py", line 
756, in _callmethod
   conn.send((self._id, methodname, args, kwds))
   File "/home/aicv/.jumbo/lib/python3.6/multiprocessing/connection.py", line 
206, in send
   self._send_bytes(_ForkingPickler.dumps(obj))
   File "/home/aicv/.jumbo/lib/python3.6/multiprocessing/reduction.py", line 
51, in dumps
   cls(buf, protocol).dump(obj)
   TypeError: cannot serialize '_io.TextIOWrapper' object
   
   But, if I remove executor_config in default_args of Dag, the program is 
working!!!
   dag = DAG(
   "example_using_k8s_executor_new",
   schedule_interval="0 1 * * *",
   catchup=False,
   default_args={
   "owner": "zhangxiao",
   "depends_on_past": False,
   "start_date": datetime(2020, 12, 5),
   "email_on_failure": False,
   "email_on_retry": False,
   "retries": 2,
   "retry_delay": timedelta(seconds=30),
   "sla": timedelta(hours=23),
   )
   
   why???????
   
   please help me!!!!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to