amoghrajesh opened a new pull request, #47375:
URL: https://github.com/apache/airflow/pull/47375
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of an existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
closes: #47349
Ideally, it isn't correct to pass `executor_config` with `pod_override` for
CE as it makes no sense to do that. It is generally a config for KE / EDGE etc.
When `executor_config` is passed to an executor, with AF 3, it is present as
part of the TI model & is serialised as a workload and sent to the supervisor
to run. We do not need to ever pass it down to the supervise function as it
these are "decorators" used to set the "environment" for execution for the
task. Example: they are used to pick right image, volumes etc for KE.
Ignoring the config for executor_config while performing a serialisation.
## Testing:
- Used the same DAG as earlier:
```
from airflow.models import DAG
from airflow.providers.standard.operators.python import
PythonVirtualenvOperator
from pendulum import today
from kubernetes.client import models as k8s
def callable_virtualenv():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import
the
library before it is installed.
"""
from time import sleep
from colorama import Back, Fore, Style
print(Fore.RED + "some red text")
print(Back.GREEN + "and with a green background")
print(Style.DIM + "and in dim text")
print(Style.RESET_ALL)
for _ in range(10):
print(Style.DIM + "Please wait...", flush=True)
sleep(10)
print("Finished")
with DAG(
dag_id="virtualenv_python_operator",
default_args={"owner": "airflow"},
schedule=None,
start_date=today('UTC').add(days=-2),
tags=["core"],
) as dag:
task = PythonVirtualenvOperator(
task_id="virtualenv_python",
python_callable=callable_virtualenv,
requirements=["colorama==0.4.0"],
system_site_packages=False,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
resources=k8s.V1ResourceRequirements(
requests={
"cpu": "100m",
"memory": "384Mi",
},
limits={
"cpu": 1,
"memory": "500Mi",
}
)
)
]
)
)
}
)
```
<img width="1726" alt="image"
src="https://github.com/user-attachments/assets/cbb31186-b40f-4368-a867-ed99f0b4ef6e"
/>
Logs:
```
::group::Log message source details
sources=["/root/airflow/logs/dag_id=virtualenv_python_operator/run_id=manual__2025-03-05T07:31:42.634180+00:00_Vfvn3SQt/task_id=virtualenv_python/attempt=1.log"]
::endgroup::
[2025-03-05T07:31:44.480947Z] DEBUG - Loading plugins
logger="airflow.plugins_manager"
[2025-03-05T07:31:44.481644Z] DEBUG - Loading plugins from directory:
/files/plugins logger="airflow.plugins_manager"
[2025-03-05T07:31:44.481732Z] DEBUG - Note: Loading plugins from examples as
well: /files/plugins logger="airflow.plugins_manager"
[2025-03-05T07:31:44.485294Z] WARNING - /files/plugins/my_xcom.py:13:
SAWarning: This declarative base already contains a class with the same class
name and module name as my_xcom.JSONFileXComBackend, and will be replaced in
the string-lookup table. class JSONFileXComBackend(BaseXCom):
logger="py.warnings"
[2025-03-05T07:31:44.487014Z] ERROR - Failed to import plugin
/files/plugins/my_plugin.py logger="airflow.plugins_manager"
error_detail=[{"exc_type":"ModuleNotFoundError","exc_value":"No module named
'airflow.sdk.definitions.baseoperatorlink'","syntax_error":null,"is_cause":false,"frames":[{"filename":"/opt/airflow/airflow/plugins_manager.py","lineno":290,"name":"load_plugins_from_plugin_directory"},{"filename":"<frozen
importlib._bootstrap_external>","lineno":850,"name":"exec_module"},{"filename":"<frozen
importlib._bootstrap>","lineno":228,"name":"_call_with_frames_removed"},{"filename":"/files/plugins/my_plugin.py","lineno":2,"name":"<module>"}]}]
[2025-03-05T07:31:45.017015Z] DEBUG - Loading plugins from entrypoints
logger="airflow.plugins_manager"
[2025-03-05T07:31:45.017504Z] DEBUG - Importing entry_point plugin hive
logger="airflow.plugins_manager"
[2025-03-05T07:31:45.024081Z] DEBUG - Importing entry_point plugin
databricks_workflow logger="airflow.plugins_manager"
[2025-03-05T07:31:45.039245Z] DEBUG - Importing entry_point plugin
edge_executor logger="airflow.plugins_manager"
[2025-03-05T07:31:45.045887Z] DEBUG - Importing entry_point plugin
openlineage logger="airflow.plugins_manager"
[2025-03-05T07:31:45.120448Z] DEBUG - Importing entry_point plugin
databricks_workflow logger="airflow.plugins_manager"
[2025-03-05T07:31:45.121064Z] DEBUG - Importing entry_point plugin hive
logger="airflow.plugins_manager"
[2025-03-05T07:31:45.121515Z] DEBUG - Importing entry_point plugin
edge_executor logger="airflow.plugins_manager"
[2025-03-05T07:31:45.121877Z] DEBUG - Importing entry_point plugin
openlineage logger="airflow.plugins_manager"
[2025-03-05T07:31:45.122289Z] DEBUG - Loading 7 plugin(s) took 641.19
seconds logger="airflow.plugins_manager"
[2025-03-05T07:31:45.122583Z] DEBUG - Calling 'on_starting' with
{'component': <airflow.sdk.execution_time.task_runner.TaskRunnerMarker object
at 0xffff877ea6d0>} logger="airflow.listeners.listener"
[2025-03-05T07:31:45.122827Z] DEBUG - Hook impls: []
logger="airflow.listeners.listener"
[2025-03-05T07:31:45.122926Z] DEBUG - Result from 'on_starting': []
logger="airflow.listeners.listener"
[2025-03-05T07:31:45.124100Z] INFO - DAG bundles loaded: dags-folder,
example_dags logger="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-03-05T07:31:45.124317Z] INFO - Filling up the DagBag from
/files/dags/dags/bug-ce.py logger="airflow.models.dagbag.DagBag"
[2025-03-05T07:31:45.124581Z] DEBUG - Importing /files/dags/dags/bug-ce.py
logger="airflow.models.dagbag.DagBag"
[2025-03-05T07:31:45.129295Z] DEBUG - Loaded DAG <DAG:
virtualenv_python_operator> logger="airflow.models.dagbag.DagBag"
[2025-03-05T07:31:45.129575Z] DEBUG - DAG file parsed file="dags/bug-ce.py"
logger="task"
[2025-03-05T07:31:45.187530Z] DEBUG - Sending request
json="{\"rendered_fields\":{\"op_args\":\"()\",\"templates_dict\":null,\"requirements\":[\"colorama==0.4.0\"],\"index_urls\":null,\"venv_cache_path\":null,\"op_kwargs\":{}},\"type\":\"SetRenderedFields\"}\n"
logger="task"
[2025-03-05T07:31:45.187852Z] DEBUG - Calling 'on_task_instance_running'
with {'previous_state': <TaskInstanceState.QUEUED: 'queued'>, 'task_instance':
RuntimeTaskInstance(id=UUID('01956537-adbd-7b50-80fe-583631d0f323'),
task_id='virtualenv_python', dag_id='virtualenv_python_operator',
run_id='manual__2025-03-05T07:31:42.634180+00:00_Vfvn3SQt', try_number=1,
map_index=-1, hostname='12b15570bb78', task=<Task(PythonVirtualenvOperator):
virtualenv_python>, max_tries=0, start_date=datetime.datetime(2025, 3, 5, 7,
31, 44, 441399, tzinfo=TzInfo(UTC)))} logger="airflow.listeners.listener"
[2025-03-05T07:31:45.187906Z] DEBUG - Hook impls: [<HookImpl
plugin_name='airflow.example_dags.plugins.event_listener', plugin=<module
'airflow.example_dags.plugins.event_listener' from
'/opt/airflow/airflow/example_dags/plugins/event_listener.py'>>]
logger="airflow.listeners.listener"
[2025-03-05T07:31:45.188037Z] DEBUG - Result from
'on_task_instance_running': [] logger="airflow.listeners.listener"
[2025-03-05T07:31:45.197909Z] WARNING - PythonVirtualenvOperator.execute
cannot be called outside TaskInstance!
logger="airflow.task.operators.airflow.providers.standard.operators.python.PythonVirtualenvOperator"
[2025-03-05T07:31:45.197981Z] WARNING - PythonVirtualenvOperator.execute
cannot be called outside TaskInstance!
logger="airflow.task.operators.airflow.providers.standard.operators.python.PythonVirtualenvOperator"
[2025-03-05T07:31:45.198578Z] INFO - Executing cmd: uv venv --allow-existing
--seed --python python /tmp/venv45hz_3xa logger="airflow.utils.process_utils"
[2025-03-05T07:31:45.206278Z] INFO - Output:
logger="airflow.utils.process_utils"
[2025-03-05T07:31:45.206743Z] INFO - Task instance is in running state
chan="stdout" logger="task"
[2025-03-05T07:31:45.206764Z] INFO - Previous state of the Task instance:
queued chan="stdout" logger="task"
[2025-03-05T07:31:45.206777Z] INFO - Current task name:virtualenv_python
chan="stdout" logger="task"
[2025-03-05T07:31:45.206793Z] INFO - Dag name:virtualenv_python_operator
chan="stdout" logger="task"
[2025-03-05T07:31:45.248073Z] INFO - Using CPython 3.9.21 interpreter at:
/usr/local/bin/python logger="airflow.utils.process_utils"
[2025-03-05T07:31:45.248171Z] INFO - Creating virtual environment with seed
packages at: /tmp/venv45hz_3xa logger="airflow.utils.process_utils"
[2025-03-05T07:31:46.931764Z] INFO - + pip==25.0.1
logger="airflow.utils.process_utils"
[2025-03-05T07:31:46.931899Z] INFO - + setuptools==75.8.2
logger="airflow.utils.process_utils"
[2025-03-05T07:31:46.931932Z] INFO - + wheel==0.45.1
logger="airflow.utils.process_utils"
[2025-03-05T07:31:46.931995Z] INFO - Activate with: source
/tmp/venv45hz_3xa/bin/activate logger="airflow.utils.process_utils"
[2025-03-05T07:31:46.934027Z] INFO - Executing cmd: uv pip install --python
/tmp/venv45hz_3xa/bin/python -r /tmp/venv45hz_3xa/requirements.txt
logger="airflow.utils.process_utils"
[2025-03-05T07:31:46.941631Z] INFO - Output:
logger="airflow.utils.process_utils"
[2025-03-05T07:31:46.993076Z] INFO - Using Python 3.9.21 environment at:
/tmp/venv45hz_3xa logger="airflow.utils.process_utils"
[2025-03-05T07:31:47.166176Z] INFO - Resolved 1 package in 171ms
logger="airflow.utils.process_utils"
[2025-03-05T07:31:47.189297Z] INFO - Prepared 1 package in 22ms
logger="airflow.utils.process_utils"
[2025-03-05T07:31:47.193621Z] INFO - Installed 1 package in 4ms
logger="airflow.utils.process_utils"
[2025-03-05T07:31:47.193748Z] INFO - + colorama==0.4.0
logger="airflow.utils.process_utils"
[2025-03-05T07:31:47.205122Z] INFO - Executing cmd:
/tmp/venv45hz_3xa/bin/python /tmp/venv-callxpo2rnmx/script.py
/tmp/venv-callxpo2rnmx/script.in /tmp/venv-callxpo2rnmx/script.out
/tmp/venv-callxpo2rnmx/string_args.txt /tmp/venv-callxpo2rnmx/termination.log
/tmp/venv-callxpo2rnmx/airflow_context.json logger="airflow.utils.process_utils"
[2025-03-05T07:31:47.214445Z] INFO - Output:
logger="airflow.utils.process_utils"
[2025-03-05T07:31:47.260343Z] INFO - [31msome red text
logger="airflow.utils.process_utils"
[2025-03-05T07:31:47.260438Z] INFO - [42mand with a green background
logger="airflow.utils.process_utils"
[2025-03-05T07:31:47.260467Z] INFO - [2mand in dim text
logger="airflow.utils.process_utils"
[2025-03-05T07:31:47.260488Z] INFO - [0m
logger="airflow.utils.process_utils"
[2025-03-05T07:31:47.260518Z] INFO - [2mPlease wait...
logger="airflow.utils.process_utils"
[2025-03-05T07:31:57.274693Z] INFO - [2mPlease wait...
logger="airflow.utils.process_utils"
[2025-03-05T07:32:07.283704Z] INFO - [2mPlease wait...
logger="airflow.utils.process_utils"
[2025-03-05T07:32:17.298063Z] INFO - [2mPlease wait...
logger="airflow.utils.process_utils"
[2025-03-05T07:32:27.304833Z] INFO - [2mPlease wait...
logger="airflow.utils.process_utils"
[2025-03-05T07:32:37.325562Z] INFO - [2mPlease wait...
logger="airflow.utils.process_utils"
[2025-03-05T07:32:47.346042Z] INFO - [2mPlease wait...
logger="airflow.utils.process_utils"
[2025-03-05T07:32:57.365319Z] INFO - [2mPlease wait...
logger="airflow.utils.process_utils"
[2025-03-05T07:33:07.376676Z] INFO - [2mPlease wait...
logger="airflow.utils.process_utils"
[2025-03-05T07:33:17.396723Z] INFO - [2mPlease wait...
logger="airflow.utils.process_utils"
[2025-03-05T07:33:58.405640Z] INFO - Finished
logger="airflow.utils.process_utils"
[2025-03-05T07:33:58.459264Z] INFO - Done. Returned value was: None
logger="airflow.task.operators.airflow.providers.standard.operators.python.PythonVirtualenvOperator"
[2025-03-05T07:33:58.459744Z] DEBUG - Sending request
json="{\"state\":\"success\",\"end_date\":\"2025-03-05T07:33:58.459592Z\",\"task_outlets\":[],\"outlet_events\":[],\"type\":\"SucceedTask\"}\n"
logger="task"
[2025-03-05T07:33:58.459865Z] DEBUG - Running finalizers
ti="RuntimeTaskInstance(id=UUID('01956537-adbd-7b50-80fe-583631d0f323'),
task_id='virtualenv_python', dag_id='virtualenv_python_operator',
run_id='manual__2025-03-05T07:31:42.634180+00:00_Vfvn3SQt', try_number=1,
map_index=-1, hostname='12b15570bb78', task=<Task(PythonVirtualenvOperator):
virtualenv_python>, max_tries=0, start_date=datetime.datetime(2025, 3, 5, 7,
31, 44, 441399, tzinfo=TzInfo(UTC)))" logger="task"
[2025-03-05T07:33:58.461605Z] DEBUG - Calling 'on_task_instance_success'
with {'previous_state': <TaskInstanceState.RUNNING: 'running'>,
'task_instance':
RuntimeTaskInstance(id=UUID('01956537-adbd-7b50-80fe-583631d0f323'),
task_id='virtualenv_python', dag_id='virtualenv_python_operator',
run_id='manual__2025-03-05T07:31:42.634180+00:00_Vfvn3SQt', try_number=1,
map_index=-1, hostname='12b15570bb78', task=<Task(PythonVirtualenvOperator):
virtualenv_python>, max_tries=0, start_date=datetime.datetime(2025, 3, 5, 7,
31, 44, 441399, tzinfo=TzInfo(UTC)))} logger="airflow.listeners.listener"
[2025-03-05T07:33:58.461687Z] DEBUG - Hook impls: [<HookImpl
plugin_name='airflow.example_dags.plugins.event_listener', plugin=<module
'airflow.example_dags.plugins.event_listener' from
'/opt/airflow/airflow/example_dags/plugins/event_listener.py'>>]
logger="airflow.listeners.listener"
[2025-03-05T07:33:58.462171Z] DEBUG - Result from
'on_task_instance_success': [] logger="airflow.listeners.listener"
[2025-03-05T07:33:58.462226Z] DEBUG - Calling 'before_stopping' with
{'component': <airflow.sdk.execution_time.task_runner.TaskRunnerMarker object
at 0xffff9f340fd0>} logger="airflow.listeners.listener"
[2025-03-05T07:33:58.462252Z] DEBUG - Hook impls: []
logger="airflow.listeners.listener"
[2025-03-05T07:33:58.462275Z] DEBUG - Result from 'before_stopping': []
logger="airflow.listeners.listener"
[2025-03-05T07:33:58.488373Z] INFO - Task instance in success state
chan="stdout" logger="task"
[2025-03-05T07:33:58.488395Z] INFO - Previous state of the Task instance:
running chan="stdout" logger="task"
[2025-03-05T07:33:58.488409Z] INFO - Task
operator:<Task(PythonVirtualenvOperator): virtualenv_python> chan="stdout"
logger="task"
[2025-03-05T07:33:58.490037Z] WARNING - Airflow core logging is not using a
FileTaskHandler, can't upload logs to remote handler="<class 'NoneType'>"
logger="task"
```
<!-- Please keep an empty line above the dashes. -->
---
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information.
In case of fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a
newsfragment file, named `{pr_number}.significant.rst` or
`{issue_number}.significant.rst`, in
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]