amoghrajesh opened a new pull request, #47375:
URL: https://github.com/apache/airflow/pull/47375

   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   closes: #47349
   
   Ideally, it isn't correct to pass `executor_config` with `pod_override` for 
CE as it makes no sense to do that. It is generally a config for KE / EDGE etc. 
When `executor_config` is passed to an executor, with AF 3, it is present as 
part of the TI model & is serialised as a workload and sent to the supervisor 
to run. We do not need to ever pass it down to the supervise function as it 
these are "decorators" used to set the "environment" for  execution for the 
task. Example: they are used to pick right image, volumes etc for KE.
   
   Ignoring the config for executor_config while performing a serialisation.
   
   
   ## Testing:
   
   - Used the same DAG as earlier:
   ```
   from airflow.models import DAG
   from airflow.providers.standard.operators.python import 
PythonVirtualenvOperator
   from pendulum import today
   from kubernetes.client import models as k8s
   
   def callable_virtualenv():
       """
       Example function that will be performed in a virtual environment.
   
       Importing at the module level ensures that it will not attempt to import 
the
       library before it is installed.
       """
       from time import sleep
   
       from colorama import Back, Fore, Style
   
       print(Fore.RED + "some red text")
       print(Back.GREEN + "and with a green background")
       print(Style.DIM + "and in dim text")
       print(Style.RESET_ALL)
       for _ in range(10):
           print(Style.DIM + "Please wait...", flush=True)
           sleep(10)
       print("Finished")
   
   
   with DAG(
       dag_id="virtualenv_python_operator",
       default_args={"owner": "airflow"},
       schedule=None,
       start_date=today('UTC').add(days=-2),
       tags=["core"],
   ) as dag:
   
       task = PythonVirtualenvOperator(
           task_id="virtualenv_python",
           python_callable=callable_virtualenv,
           requirements=["colorama==0.4.0"],
           system_site_packages=False,
           executor_config={
               "pod_override": k8s.V1Pod(
                   spec=k8s.V1PodSpec(
                       containers=[
                           k8s.V1Container(
                               name="base",
                               resources=k8s.V1ResourceRequirements(
                                   requests={
                                       "cpu": "100m",
                                       "memory": "384Mi",
                                   },
                                   limits={
                                       "cpu": 1,
                                       "memory": "500Mi",
                                   }
                               )
                           )
                       ]
                   )
               )
           }
       )
   ```
   
   <img width="1726" alt="image" 
src="https://github.com/user-attachments/assets/cbb31186-b40f-4368-a867-ed99f0b4ef6e";
 />
   
   
   Logs:
   ```
   ::group::Log message source details 
sources=["/root/airflow/logs/dag_id=virtualenv_python_operator/run_id=manual__2025-03-05T07:31:42.634180+00:00_Vfvn3SQt/task_id=virtualenv_python/attempt=1.log"]
   
   ::endgroup::
   
   [2025-03-05T07:31:44.480947Z] DEBUG - Loading plugins 
logger="airflow.plugins_manager"
   
   [2025-03-05T07:31:44.481644Z] DEBUG - Loading plugins from directory: 
/files/plugins logger="airflow.plugins_manager"
   
   [2025-03-05T07:31:44.481732Z] DEBUG - Note: Loading plugins from examples as 
well: /files/plugins logger="airflow.plugins_manager"
   
   [2025-03-05T07:31:44.485294Z] WARNING - /files/plugins/my_xcom.py:13: 
SAWarning: This declarative base already contains a class with the same class 
name and module name as my_xcom.JSONFileXComBackend, and will be replaced in 
the string-lookup table. class JSONFileXComBackend(BaseXCom): 
logger="py.warnings"
   
   [2025-03-05T07:31:44.487014Z] ERROR - Failed to import plugin 
/files/plugins/my_plugin.py logger="airflow.plugins_manager" 
error_detail=[{"exc_type":"ModuleNotFoundError","exc_value":"No module named 
'airflow.sdk.definitions.baseoperatorlink'","syntax_error":null,"is_cause":false,"frames":[{"filename":"/opt/airflow/airflow/plugins_manager.py","lineno":290,"name":"load_plugins_from_plugin_directory"},{"filename":"<frozen
 
importlib._bootstrap_external>","lineno":850,"name":"exec_module"},{"filename":"<frozen
 
importlib._bootstrap>","lineno":228,"name":"_call_with_frames_removed"},{"filename":"/files/plugins/my_plugin.py","lineno":2,"name":"<module>"}]}]
   
   [2025-03-05T07:31:45.017015Z] DEBUG - Loading plugins from entrypoints 
logger="airflow.plugins_manager"
   
   [2025-03-05T07:31:45.017504Z] DEBUG - Importing entry_point plugin hive 
logger="airflow.plugins_manager"
   
   [2025-03-05T07:31:45.024081Z] DEBUG - Importing entry_point plugin 
databricks_workflow logger="airflow.plugins_manager"
   
   [2025-03-05T07:31:45.039245Z] DEBUG - Importing entry_point plugin 
edge_executor logger="airflow.plugins_manager"
   
   [2025-03-05T07:31:45.045887Z] DEBUG - Importing entry_point plugin 
openlineage logger="airflow.plugins_manager"
   
   [2025-03-05T07:31:45.120448Z] DEBUG - Importing entry_point plugin 
databricks_workflow logger="airflow.plugins_manager"
   
   [2025-03-05T07:31:45.121064Z] DEBUG - Importing entry_point plugin hive 
logger="airflow.plugins_manager"
   
   [2025-03-05T07:31:45.121515Z] DEBUG - Importing entry_point plugin 
edge_executor logger="airflow.plugins_manager"
   
   [2025-03-05T07:31:45.121877Z] DEBUG - Importing entry_point plugin 
openlineage logger="airflow.plugins_manager"
   
   [2025-03-05T07:31:45.122289Z] DEBUG - Loading 7 plugin(s) took 641.19 
seconds logger="airflow.plugins_manager"
   
   [2025-03-05T07:31:45.122583Z] DEBUG - Calling 'on_starting' with 
{'component': <airflow.sdk.execution_time.task_runner.TaskRunnerMarker object 
at 0xffff877ea6d0>} logger="airflow.listeners.listener"
   
   [2025-03-05T07:31:45.122827Z] DEBUG - Hook impls: [] 
logger="airflow.listeners.listener"
   
   [2025-03-05T07:31:45.122926Z] DEBUG - Result from 'on_starting': [] 
logger="airflow.listeners.listener"
   
   [2025-03-05T07:31:45.124100Z] INFO - DAG bundles loaded: dags-folder, 
example_dags logger="airflow.dag_processing.bundles.manager.DagBundlesManager"
   
   [2025-03-05T07:31:45.124317Z] INFO - Filling up the DagBag from 
/files/dags/dags/bug-ce.py logger="airflow.models.dagbag.DagBag"
   
   [2025-03-05T07:31:45.124581Z] DEBUG - Importing /files/dags/dags/bug-ce.py 
logger="airflow.models.dagbag.DagBag"
   
   [2025-03-05T07:31:45.129295Z] DEBUG - Loaded DAG <DAG: 
virtualenv_python_operator> logger="airflow.models.dagbag.DagBag"
   
   [2025-03-05T07:31:45.129575Z] DEBUG - DAG file parsed file="dags/bug-ce.py" 
logger="task"
   
   [2025-03-05T07:31:45.187530Z] DEBUG - Sending request 
json="{\"rendered_fields\":{\"op_args\":\"()\",\"templates_dict\":null,\"requirements\":[\"colorama==0.4.0\"],\"index_urls\":null,\"venv_cache_path\":null,\"op_kwargs\":{}},\"type\":\"SetRenderedFields\"}\n"
 logger="task"
   
   [2025-03-05T07:31:45.187852Z] DEBUG - Calling 'on_task_instance_running' 
with {'previous_state': <TaskInstanceState.QUEUED: 'queued'>, 'task_instance': 
RuntimeTaskInstance(id=UUID('01956537-adbd-7b50-80fe-583631d0f323'), 
task_id='virtualenv_python', dag_id='virtualenv_python_operator', 
run_id='manual__2025-03-05T07:31:42.634180+00:00_Vfvn3SQt', try_number=1, 
map_index=-1, hostname='12b15570bb78', task=<Task(PythonVirtualenvOperator): 
virtualenv_python>, max_tries=0, start_date=datetime.datetime(2025, 3, 5, 7, 
31, 44, 441399, tzinfo=TzInfo(UTC)))} logger="airflow.listeners.listener"
   
   [2025-03-05T07:31:45.187906Z] DEBUG - Hook impls: [<HookImpl 
plugin_name='airflow.example_dags.plugins.event_listener', plugin=<module 
'airflow.example_dags.plugins.event_listener' from 
'/opt/airflow/airflow/example_dags/plugins/event_listener.py'>>] 
logger="airflow.listeners.listener"
   
   [2025-03-05T07:31:45.188037Z] DEBUG - Result from 
'on_task_instance_running': [] logger="airflow.listeners.listener"
   
   [2025-03-05T07:31:45.197909Z] WARNING - PythonVirtualenvOperator.execute 
cannot be called outside TaskInstance! 
logger="airflow.task.operators.airflow.providers.standard.operators.python.PythonVirtualenvOperator"
   
   [2025-03-05T07:31:45.197981Z] WARNING - PythonVirtualenvOperator.execute 
cannot be called outside TaskInstance! 
logger="airflow.task.operators.airflow.providers.standard.operators.python.PythonVirtualenvOperator"
   
   [2025-03-05T07:31:45.198578Z] INFO - Executing cmd: uv venv --allow-existing 
--seed --python python /tmp/venv45hz_3xa logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:45.206278Z] INFO - Output: 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:45.206743Z] INFO - Task instance is in running state 
chan="stdout" logger="task"
   
   [2025-03-05T07:31:45.206764Z] INFO - Previous state of the Task instance: 
queued chan="stdout" logger="task"
   
   [2025-03-05T07:31:45.206777Z] INFO - Current task name:virtualenv_python 
chan="stdout" logger="task"
   
   [2025-03-05T07:31:45.206793Z] INFO - Dag name:virtualenv_python_operator 
chan="stdout" logger="task"
   
   [2025-03-05T07:31:45.248073Z] INFO - Using CPython 3.9.21 interpreter at: 
/usr/local/bin/python logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:45.248171Z] INFO - Creating virtual environment with seed 
packages at: /tmp/venv45hz_3xa logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:46.931764Z] INFO - + pip==25.0.1 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:46.931899Z] INFO - + setuptools==75.8.2 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:46.931932Z] INFO - + wheel==0.45.1 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:46.931995Z] INFO - Activate with: source 
/tmp/venv45hz_3xa/bin/activate logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:46.934027Z] INFO - Executing cmd: uv pip install --python 
/tmp/venv45hz_3xa/bin/python -r /tmp/venv45hz_3xa/requirements.txt 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:46.941631Z] INFO - Output: 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:46.993076Z] INFO - Using Python 3.9.21 environment at: 
/tmp/venv45hz_3xa logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:47.166176Z] INFO - Resolved 1 package in 171ms 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:47.189297Z] INFO - Prepared 1 package in 22ms 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:47.193621Z] INFO - Installed 1 package in 4ms 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:47.193748Z] INFO - + colorama==0.4.0 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:47.205122Z] INFO - Executing cmd: 
/tmp/venv45hz_3xa/bin/python /tmp/venv-callxpo2rnmx/script.py 
/tmp/venv-callxpo2rnmx/script.in /tmp/venv-callxpo2rnmx/script.out 
/tmp/venv-callxpo2rnmx/string_args.txt /tmp/venv-callxpo2rnmx/termination.log 
/tmp/venv-callxpo2rnmx/airflow_context.json logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:47.214445Z] INFO - Output: 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:47.260343Z] INFO - some red text 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:47.260438Z] INFO - and with a green background 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:47.260467Z] INFO - and in dim text 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:47.260488Z] INFO -  
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:47.260518Z] INFO - Please wait... 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:31:57.274693Z] INFO - Please wait... 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:32:07.283704Z] INFO - Please wait... 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:32:17.298063Z] INFO - Please wait... 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:32:27.304833Z] INFO - Please wait... 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:32:37.325562Z] INFO - Please wait... 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:32:47.346042Z] INFO - Please wait... 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:32:57.365319Z] INFO - Please wait... 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:33:07.376676Z] INFO - Please wait... 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:33:17.396723Z] INFO - Please wait... 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:33:58.405640Z] INFO - Finished 
logger="airflow.utils.process_utils"
   
   [2025-03-05T07:33:58.459264Z] INFO - Done. Returned value was: None 
logger="airflow.task.operators.airflow.providers.standard.operators.python.PythonVirtualenvOperator"
   
   [2025-03-05T07:33:58.459744Z] DEBUG - Sending request 
json="{\"state\":\"success\",\"end_date\":\"2025-03-05T07:33:58.459592Z\",\"task_outlets\":[],\"outlet_events\":[],\"type\":\"SucceedTask\"}\n"
 logger="task"
   
   [2025-03-05T07:33:58.459865Z] DEBUG - Running finalizers 
ti="RuntimeTaskInstance(id=UUID('01956537-adbd-7b50-80fe-583631d0f323'), 
task_id='virtualenv_python', dag_id='virtualenv_python_operator', 
run_id='manual__2025-03-05T07:31:42.634180+00:00_Vfvn3SQt', try_number=1, 
map_index=-1, hostname='12b15570bb78', task=<Task(PythonVirtualenvOperator): 
virtualenv_python>, max_tries=0, start_date=datetime.datetime(2025, 3, 5, 7, 
31, 44, 441399, tzinfo=TzInfo(UTC)))" logger="task"
   
   [2025-03-05T07:33:58.461605Z] DEBUG - Calling 'on_task_instance_success' 
with {'previous_state': <TaskInstanceState.RUNNING: 'running'>, 
'task_instance': 
RuntimeTaskInstance(id=UUID('01956537-adbd-7b50-80fe-583631d0f323'), 
task_id='virtualenv_python', dag_id='virtualenv_python_operator', 
run_id='manual__2025-03-05T07:31:42.634180+00:00_Vfvn3SQt', try_number=1, 
map_index=-1, hostname='12b15570bb78', task=<Task(PythonVirtualenvOperator): 
virtualenv_python>, max_tries=0, start_date=datetime.datetime(2025, 3, 5, 7, 
31, 44, 441399, tzinfo=TzInfo(UTC)))} logger="airflow.listeners.listener"
   
   [2025-03-05T07:33:58.461687Z] DEBUG - Hook impls: [<HookImpl 
plugin_name='airflow.example_dags.plugins.event_listener', plugin=<module 
'airflow.example_dags.plugins.event_listener' from 
'/opt/airflow/airflow/example_dags/plugins/event_listener.py'>>] 
logger="airflow.listeners.listener"
   
   [2025-03-05T07:33:58.462171Z] DEBUG - Result from 
'on_task_instance_success': [] logger="airflow.listeners.listener"
   
   [2025-03-05T07:33:58.462226Z] DEBUG - Calling 'before_stopping' with 
{'component': <airflow.sdk.execution_time.task_runner.TaskRunnerMarker object 
at 0xffff9f340fd0>} logger="airflow.listeners.listener"
   
   [2025-03-05T07:33:58.462252Z] DEBUG - Hook impls: [] 
logger="airflow.listeners.listener"
   
   [2025-03-05T07:33:58.462275Z] DEBUG - Result from 'before_stopping': [] 
logger="airflow.listeners.listener"
   
   [2025-03-05T07:33:58.488373Z] INFO - Task instance in success state 
chan="stdout" logger="task"
   
   [2025-03-05T07:33:58.488395Z] INFO - Previous state of the Task instance: 
running chan="stdout" logger="task"
   
   [2025-03-05T07:33:58.488409Z] INFO - Task 
operator:<Task(PythonVirtualenvOperator): virtualenv_python> chan="stdout" 
logger="task"
   
   [2025-03-05T07:33:58.490037Z] WARNING - Airflow core logging is not using a 
FileTaskHandler, can't upload logs to remote handler="<class 'NoneType'>" 
logger="task"
   ```
   
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to