amoghrajesh commented on code in PR #47644: URL: https://github.com/apache/airflow/pull/47644#discussion_r2004820685
########## airflow/version_compat.py: ########## @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one Review Comment: No need to have this file ########## providers/apache/hive/src/airflow/providers/apache/hive/transfers/hive_to_mysql.py: ########## @@ -26,7 +26,7 @@ from airflow.models import BaseOperator from airflow.providers.apache.hive.hooks.hive import HiveServer2Hook from airflow.providers.mysql.hooks.mysql import MySqlHook -from airflow.utils.operator_helpers import context_to_airflow_vars +from airflow.sdk.execution_time.context import context_to_airflow_vars Review Comment: Need version check here ########## providers/apache/hive/tests/unit/apache/hive/transfers/test_hive_to_mysql.py: ########## @@ -25,9 +25,15 @@ from airflow.providers.apache.hive.transfers.hive_to_mysql import HiveToMySqlOperator from airflow.utils import timezone -from airflow.utils.operator_helpers import context_to_airflow_vars from unit.apache.hive import MockHiveServer2Hook, MockMySqlHook, TestHiveEnvironment +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk.execution_time.context import context_to_airflow_vars +else: + from airflow.utils.operator_helpers import context_to_airflow_vars # type: ignore[no-redef, attr-defined] Review Comment: Yeah this is good ########## task-sdk/src/airflow/sdk/execution_time/context.py: ########## @@ -411,3 +448,70 @@ def context_update_for_unmapped(context: Context, task: BaseOperator) -> None: context["params"] = process_params( context["dag"], task, context["dag_run"].conf, suppress_exception=False ) + + +def context_to_airflow_vars(context: Mapping[str, Any], in_env_var_format: bool = False) -> dict[str, str]: + """ + Return values used to externally reconstruct relations between dags, dag_runs, tasks and task_instances. + + Given a context, this function provides a dictionary of values that can be used to + externally reconstruct relations between dags, dag_runs, tasks and task_instances. + Default to abc.def.ghi format and can be made to ABC_DEF_GHI format if + in_env_var_format is set to True. + + :param context: The context for the task_instance of interest. + :param in_env_var_format: If returned vars should be in ABC_DEF_GHI format. + :return: task_instance context as dict. + """ + from airflow import settings Review Comment: ```suggestion from airflow import settings from datetime import datetime ``` ########## task-sdk/src/airflow/sdk/execution_time/context.py: ########## @@ -18,6 +18,7 @@ import contextlib from collections.abc import Generator, Iterator, Mapping +from datetime import datetime Review Comment: ```suggestion ``` ########## providers/apache/hive/src/airflow/providers/apache/hive/version_compat.py: ########## @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one Review Comment: No need to introduce a new file since we now have compat provider to help us. File: https://github.com/apache/airflow/blob/main/providers/common/compat/src/airflow/providers/common/compat/version_compat.py And example usage: https://github.com/apache/airflow/blob/857d14b34d393000d5d23f4acc9264298688c6dd/providers/yandex/src/airflow/providers/yandex/links/yq.py#L31-L38 ########## providers/apache/hive/src/airflow/providers/apache/hive/transfers/hive_to_samba.py: ########## @@ -26,7 +26,7 @@ from airflow.models import BaseOperator from airflow.providers.apache.hive.hooks.hive import HiveServer2Hook from airflow.providers.samba.hooks.samba import SambaHook -from airflow.utils.operator_helpers import context_to_airflow_vars +from airflow.sdk.execution_time.context import context_to_airflow_vars Review Comment: Need version check here ########## task-sdk/tests/task_sdk/execution_time/test_task_runner.py: ########## @@ -916,6 +916,34 @@ def test_run_with_inlets_and_outlets( mock_supervisor_comms.send_request.assert_any_call(msg=last_expected_msg, log=mock.ANY) +@mock.patch("airflow.sdk.execution_time.task_runner.context_to_airflow_vars") +@mock.patch.dict(os.environ, {}, clear=True) +def test_execute_task_exports_env_vars( + mock_context_to_airflow_vars, create_runtime_ti, mock_supervisor_comms +): + """Test that _execute_task exports airflow context to environment variables.""" + + def test_function(): + return "test function" + + task = PythonOperator( + task_id="test_task", + python_callable=test_function, + ) + + ti = create_runtime_ti(task=task, dag_id="dag_with_env_vars") + mock_supervisor_comms.get_message.return_value = OKResponse( + ok=True, + ) Review Comment: You shoudlnt' need this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org