atul-astronomer opened a new issue, #47349:
URL: https://github.com/apache/airflow/issues/47349

   ### Apache Airflow version
   
   3.0.0
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   Scheduler is crasing while using PythonVirtualenvOperator.
   
   **Error**:
   
   ```typescript
   File "/opt/airflow/airflow/cli/commands/local_commands/daemon_utils.py", 
line 86, in run_command_with_daemon_option
       callback()
     File 
"/opt/airflow/airflow/cli/commands/local_commands/scheduler_command.py", line 
55, in <lambda>
       callback=lambda: _run_scheduler_job(args),
     File 
"/opt/airflow/airflow/cli/commands/local_commands/scheduler_command.py", line 
43, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File "/opt/airflow/airflow/utils/session.py", line 101, in wrapper
       return func(*args, session=session, **kwargs)
     File "/opt/airflow/airflow/jobs/job.py", line 342, in run_job
       return execute_job(job, execute_callable=execute_callable)
     File "/opt/airflow/airflow/jobs/job.py", line 371, in execute_job
       ret = execute_callable()
     File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 926, in 
_execute
       self._run_scheduler_loop()
     File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 1051, in 
_run_scheduler_loop
       executor.heartbeat()
     File "/opt/airflow/airflow/traces/tracer.py", line 54, in wrapper
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/executors/base_executor.py", line 252, in 
heartbeat
       self.trigger_tasks(open_slots)
     File "/opt/airflow/airflow/traces/tracer.py", line 54, in wrapper
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/executors/base_executor.py", line 409, in 
trigger_tasks
       self._process_workloads(workloads)  # type: ignore[attr-defined]
     File 
"/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor.py",
 line 281, in _process_workloads
       self._send_tasks(tasks)
     File 
"/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor.py",
 line 290, in _send_tasks
       key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
     File 
"/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor.py",
 line 329, in _send_tasks_to_celery
       return list(map(send_task_to_executor, task_tuples_to_send))
     File 
"/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py",
 line 266, in send_task_to_executor
       args = (args.model_dump_json(),)
     File "/usr/local/lib/python3.9/site-packages/pydantic/main.py", line 477, 
in model_dump_json
       return self.__pydantic_serializer__.to_json(
   pydantic_core._pydantic_core.PydanticSerializationError: Unable to serialize 
unknown type: <class 'kubernetes.client.models.v1_pod.V1Pod'>
   ``` 
   
   ### What you think should happen instead?
   
   PythonVirtualenvOperator should not cause scheduler to crash.
   
   ### How to reproduce
   
   Run the below DAG:
   
   ```python
   from airflow.models import DAG
   from airflow.providers.standard.operators.python import 
PythonVirtualenvOperator
   from pendulum import today
   from kubernetes.client import models as k8s
   
   def callable_virtualenv():
       """
       Example function that will be performed in a virtual environment.
   
       Importing at the module level ensures that it will not attempt to import 
the
       library before it is installed.
       """
       from time import sleep
   
       from colorama import Back, Fore, Style
   
       print(Fore.RED + "some red text")
       print(Back.GREEN + "and with a green background")
       print(Style.DIM + "and in dim text")
       print(Style.RESET_ALL)
       for _ in range(10):
           print(Style.DIM + "Please wait...", flush=True)
           sleep(10)
       print("Finished")
   
   
   with DAG(
       dag_id="virtualenv_python_operator",
       default_args={"owner": "airflow"},
       schedule=None,
       start_date=today('UTC').add(days=-2),
       tags=["core"],
   ) as dag:
   
       task = PythonVirtualenvOperator(
           task_id="virtualenv_python",
           python_callable=callable_virtualenv,
           requirements=["colorama==0.4.0"],
           system_site_packages=False,
           executor_config={
               "pod_override": k8s.V1Pod(
                   spec=k8s.V1PodSpec(
                       containers=[
                           k8s.V1Container(
                               name="base",
                               resources=k8s.V1ResourceRequirements(
                                   requests={
                                       "cpu": "100m",
                                       "memory": "384Mi",
                                   },
                                   limits={
                                       "cpu": 1,
                                       "memory": "500Mi",
                                   }
                               )
                           )
                       ]
                   )
               )
           }
       )
   
   ``` 
   
   ### Operating System
   
   Linux
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to