amoghrajesh opened a new pull request, #47548:
URL: https://github.com/apache/airflow/pull/47548
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of an existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
We recently ran into this issue for celery: #47349 for celery executor and
as part of that effort we raised the bug:
https://github.com/apache/airflow/issues/47377 which was to potentially handle
custom executors using task sdk.
`executor_config` is a property which is a "pre condition" of a task
workload and is fulfilled to create a working environment for the task to run,
it needn't and shouldn't need to be passed to the executor.
If we used a task like this one:
```
task = PythonVirtualenvOperator(
task_id="virtualenv_python",
python_callable=callable_virtualenv,
requirements=["colorama==0.4.0"],
system_site_packages=False,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
resources=k8s.V1ResourceRequirements(
requests={
"cpu": "100m",
"memory": "384Mi",
},
limits={
"cpu": 1,
"memory": "500Mi",
}
)
)
]
)
)
}
)
```
Error earlier:
```
callback()
File
"/opt/airflow/airflow/cli/commands/local_commands/scheduler_command.py", line
55, in <lambda>
callback=lambda: _run_scheduler_job(args),
File
"/opt/airflow/airflow/cli/commands/local_commands/scheduler_command.py", line
43, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File "/opt/airflow/airflow/utils/session.py", line 101, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/airflow/airflow/jobs/job.py", line 342, in run_job
return execute_job(job, execute_callable=execute_callable)
File "/opt/airflow/airflow/jobs/job.py", line 371, in execute_job
ret = execute_callable()
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 935, in
_execute
self._run_scheduler_loop()
File "/opt/airflow/airflow/jobs/scheduler_job_runner.py", line 1067, in
_run_scheduler_loop
executor.heartbeat()
File "/opt/airflow/airflow/traces/tracer.py", line 54, in wrapper
return func(*args, **kwargs)
File "/opt/airflow/airflow/executors/base_executor.py", line 250, in
heartbeat
self.trigger_tasks(open_slots)
File "/opt/airflow/airflow/traces/tracer.py", line 54, in wrapper
return func(*args, **kwargs)
File "/opt/airflow/airflow/executors/base_executor.py", line 407, in
trigger_tasks
self._process_workloads(workloads) # type: ignore[attr-defined]
File
"/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor.py",
line 281, in _process_workloads
self._send_tasks(tasks)
File
"/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor.py",
line 290, in _send_tasks
key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
File
"/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor.py",
line 329, in _send_tasks_to_celery
return list(map(send_task_to_executor, task_tuples_to_send))
File
"/opt/airflow/providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py",
line 267, in send_task_to_executor
args = (args.model_dump_json(),)
File "/usr/local/lib/python3.9/site-packages/pydantic/main.py", line 477,
in model_dump_json
return self.__pydantic_serializer__.to_json(
pydantic_core._pydantic_core.PydanticSerializationError: Unable to serialize
unknown type: <class 'kubernetes.client.models.v1_pod.V1Pod'>
```
After the fix, the DAG runs fine:

And it doesn't crash the scheduler too
<!-- Please keep an empty line above the dashes. -->
---
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information.
In case of fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a
newsfragment file, named `{pr_number}.significant.rst` or
`{issue_number}.significant.rst`, in
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]