amoghrajesh commented on code in PR #47644:
URL: https://github.com/apache/airflow/pull/47644#discussion_r2004820685


##########
airflow/version_compat.py:
##########
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   No need to have this file



##########
providers/apache/hive/src/airflow/providers/apache/hive/transfers/hive_to_mysql.py:
##########
@@ -26,7 +26,7 @@
 from airflow.models import BaseOperator
 from airflow.providers.apache.hive.hooks.hive import HiveServer2Hook
 from airflow.providers.mysql.hooks.mysql import MySqlHook
-from airflow.utils.operator_helpers import context_to_airflow_vars
+from airflow.sdk.execution_time.context import context_to_airflow_vars

Review Comment:
   Need version check here



##########
providers/apache/hive/tests/unit/apache/hive/transfers/test_hive_to_mysql.py:
##########
@@ -25,9 +25,15 @@
 
 from airflow.providers.apache.hive.transfers.hive_to_mysql import 
HiveToMySqlOperator
 from airflow.utils import timezone
-from airflow.utils.operator_helpers import context_to_airflow_vars
 from unit.apache.hive import MockHiveServer2Hook, MockMySqlHook, 
TestHiveEnvironment
 
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+if AIRFLOW_V_3_0_PLUS:
+    from airflow.sdk.execution_time.context import context_to_airflow_vars
+else:
+    from airflow.utils.operator_helpers import context_to_airflow_vars  # 
type: ignore[no-redef, attr-defined]

Review Comment:
   Yeah this is good



##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -411,3 +448,70 @@ def context_update_for_unmapped(context: Context, task: 
BaseOperator) -> None:
     context["params"] = process_params(
         context["dag"], task, context["dag_run"].conf, suppress_exception=False
     )
+
+
+def context_to_airflow_vars(context: Mapping[str, Any], in_env_var_format: 
bool = False) -> dict[str, str]:
+    """
+    Return values used to externally reconstruct relations between dags, 
dag_runs, tasks and task_instances.
+
+    Given a context, this function provides a dictionary of values that can be 
used to
+    externally reconstruct relations between dags, dag_runs, tasks and 
task_instances.
+    Default to abc.def.ghi format and can be made to ABC_DEF_GHI format if
+    in_env_var_format is set to True.
+
+    :param context: The context for the task_instance of interest.
+    :param in_env_var_format: If returned vars should be in ABC_DEF_GHI format.
+    :return: task_instance context as dict.
+    """
+    from airflow import settings

Review Comment:
   ```suggestion
       from airflow import settings
       from datetime import datetime
   ```



##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -18,6 +18,7 @@
 
 import contextlib
 from collections.abc import Generator, Iterator, Mapping
+from datetime import datetime

Review Comment:
   ```suggestion
   ```



##########
providers/apache/hive/src/airflow/providers/apache/hive/version_compat.py:
##########
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   No need to introduce a new file since we now have compat provider to help us.
   File: 
https://github.com/apache/airflow/blob/main/providers/common/compat/src/airflow/providers/common/compat/version_compat.py
   
   And example usage: 
https://github.com/apache/airflow/blob/857d14b34d393000d5d23f4acc9264298688c6dd/providers/yandex/src/airflow/providers/yandex/links/yq.py#L31-L38



##########
providers/apache/hive/src/airflow/providers/apache/hive/transfers/hive_to_samba.py:
##########
@@ -26,7 +26,7 @@
 from airflow.models import BaseOperator
 from airflow.providers.apache.hive.hooks.hive import HiveServer2Hook
 from airflow.providers.samba.hooks.samba import SambaHook
-from airflow.utils.operator_helpers import context_to_airflow_vars
+from airflow.sdk.execution_time.context import context_to_airflow_vars

Review Comment:
   Need version check here



##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -916,6 +916,34 @@ def test_run_with_inlets_and_outlets(
     mock_supervisor_comms.send_request.assert_any_call(msg=last_expected_msg, 
log=mock.ANY)
 
 
+@mock.patch("airflow.sdk.execution_time.task_runner.context_to_airflow_vars")
+@mock.patch.dict(os.environ, {}, clear=True)
+def test_execute_task_exports_env_vars(
+    mock_context_to_airflow_vars, create_runtime_ti, mock_supervisor_comms
+):
+    """Test that _execute_task exports airflow context to environment 
variables."""
+
+    def test_function():
+        return "test function"
+
+    task = PythonOperator(
+        task_id="test_task",
+        python_callable=test_function,
+    )
+
+    ti = create_runtime_ti(task=task, dag_id="dag_with_env_vars")
+    mock_supervisor_comms.get_message.return_value = OKResponse(
+        ok=True,
+    )

Review Comment:
   You shoudlnt' need this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to