This is an automated email from the ASF dual-hosted git repository.

utkarsharma pushed a commit to branch sync_2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1eaf5a741d34832c820ba1f943c1c564e061dfd6
Author: Wei Lee <[email protected]>
AuthorDate: Thu Nov 14 08:36:49 2024 +0800

    feat(dataset): raise deprecation warning when accessing inlet or outlet 
events through str (#43922)
    
    this behavior will be removed in airflow 3 as assets have attributes name 
and uri,
    it would be confusing to identify which attribute should be used to filter 
the right asset
---
 airflow/example_dags/example_dataset_alias.py      |  2 +-
 .../example_dataset_alias_with_no_taskflow.py      |  4 +++-
 airflow/example_dags/example_inlet_event_extra.py  |  2 +-
 airflow/utils/context.py                           | 27 ++++++++++++++++++++++
 .../authoring-and-scheduling/datasets.rst          | 14 ++++++-----
 tests/models/test_taskinstance.py                  | 18 +++++++--------
 6 files changed, 49 insertions(+), 18 deletions(-)

diff --git a/airflow/example_dags/example_dataset_alias.py 
b/airflow/example_dags/example_dataset_alias.py
index c50a89e34fb..4bfc6f51a73 100644
--- a/airflow/example_dags/example_dataset_alias.py
+++ b/airflow/example_dags/example_dataset_alias.py
@@ -67,7 +67,7 @@ with DAG(
     def produce_dataset_events_through_dataset_alias(*, outlet_events=None):
         bucket_name = "bucket"
         object_path = "my-task"
-        
outlet_events["example-alias"].add(Dataset(f"s3://{bucket_name}/{object_path}"))
+        
outlet_events[DatasetAlias("example-alias")].add(Dataset(f"s3://{bucket_name}/{object_path}"))
 
     produce_dataset_events_through_dataset_alias()
 
diff --git a/airflow/example_dags/example_dataset_alias_with_no_taskflow.py 
b/airflow/example_dags/example_dataset_alias_with_no_taskflow.py
index 7d7227af39f..72863618e39 100644
--- a/airflow/example_dags/example_dataset_alias_with_no_taskflow.py
+++ b/airflow/example_dags/example_dataset_alias_with_no_taskflow.py
@@ -68,7 +68,9 @@ with DAG(
     def produce_dataset_events_through_dataset_alias_with_no_taskflow(*, 
outlet_events=None):
         bucket_name = "bucket"
         object_path = "my-task"
-        
outlet_events["example-alias-no-taskflow"].add(Dataset(f"s3://{bucket_name}/{object_path}"))
+        outlet_events[DatasetAlias("example-alias-no-taskflow")].add(
+            Dataset(f"s3://{bucket_name}/{object_path}")
+        )
 
     PythonOperator(
         
task_id="produce_dataset_events_through_dataset_alias_with_no_taskflow",
diff --git a/airflow/example_dags/example_inlet_event_extra.py 
b/airflow/example_dags/example_inlet_event_extra.py
index 4b7567fc2f8..b07faf2bdfe 100644
--- a/airflow/example_dags/example_inlet_event_extra.py
+++ b/airflow/example_dags/example_inlet_event_extra.py
@@ -57,5 +57,5 @@ with DAG(
     BashOperator(
         task_id="read_dataset_event_from_classic",
         inlets=[ds],
-        bash_command="echo '{{ inlet_events['s3://output/1.txt'][-1].extra | 
tojson }}'",
+        bash_command="echo '{{ 
inlet_events[Dataset('s3://output/1.txt')][-1].extra | tojson }}'",
     )
diff --git a/airflow/utils/context.py b/airflow/utils/context.py
index a72885401f7..9dddcc3f16c 100644
--- a/airflow/utils/context.py
+++ b/airflow/utils/context.py
@@ -177,6 +177,14 @@ class OutletEventAccessor:
     def add(self, dataset: Dataset | str, extra: dict[str, Any] | None = None) 
-> None:
         """Add a DatasetEvent to an existing Dataset."""
         if isinstance(dataset, str):
+            warnings.warn(
+                (
+                    "Emitting dataset events using string is deprecated and 
will be removed in Airflow 3. "
+                    "Please use the Dataset object (renamed as Asset in 
Airflow 3) directly"
+                ),
+                DeprecationWarning,
+                stacklevel=2,
+            )
             dataset_uri = dataset
         elif isinstance(dataset, Dataset):
             dataset_uri = dataset.uri
@@ -216,6 +224,16 @@ class OutletEventAccessors(Mapping[str, 
OutletEventAccessor]):
         return len(self._dict)
 
     def __getitem__(self, key: str | Dataset | DatasetAlias) -> 
OutletEventAccessor:
+        if isinstance(key, str):
+            warnings.warn(
+                (
+                    "Accessing outlet_events using string is deprecated and 
will be removed in Airflow 3. "
+                    "Please use the Dataset or DatasetAlias object (renamed as 
Asset and AssetAlias in Airflow 3) directly"
+                ),
+                DeprecationWarning,
+                stacklevel=2,
+            )
+
         event_key = extract_event_key(key)
         if event_key not in self._dict:
             self._dict[event_key] = OutletEventAccessor(extra={}, raw_key=key)
@@ -282,6 +300,15 @@ class InletEventsAccessors(Mapping[str, 
LazyDatasetEventSelectSequence]):
             join_clause = DatasetEvent.source_aliases
             where_clause = DatasetAliasModel.name == dataset_alias.name
         elif isinstance(obj, (Dataset, str)):
+            if isinstance(obj, str):
+                warnings.warn(
+                    (
+                        "Accessing inlet_events using string is deprecated and 
will be removed in Airflow 3. "
+                        "Please use the Dataset object (renamed as Asset in 
Airflow 3) directly"
+                    ),
+                    DeprecationWarning,
+                    stacklevel=2,
+                )
             dataset = self._datasets[extract_event_key(obj)]
             join_clause = DatasetEvent.dataset
             where_clause = DatasetModel.uri == dataset.uri
diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst 
b/docs/apache-airflow/authoring-and-scheduling/datasets.rst
index a69c09bc13b..c5d117ab5a5 100644
--- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst
@@ -432,7 +432,7 @@ The following example creates a dataset event against the 
S3 URI ``f"s3://bucket
 
     @task(outlets=[DatasetAlias("my-task-outputs")])
     def my_task_with_outlet_events(*, outlet_events):
-        outlet_events["my-task-outputs"].add(Dataset("s3://bucket/my-task"), 
extra={"k": "v"})
+        
outlet_events[DatasetAlias("my-task-outputs")].add(Dataset("s3://bucket/my-task"),
 extra={"k": "v"})
 
 
 **Emit a dataset event during task execution through yielding Metadata**
@@ -462,11 +462,11 @@ Only one dataset event is emitted for an added dataset, 
even if it is added to t
         ]
     )
     def my_task_with_outlet_events(*, outlet_events):
-        outlet_events["my-task-outputs-1"].add(Dataset("s3://bucket/my-task"), 
extra={"k": "v"})
+        
outlet_events[DatasetAlias("my-task-outputs-1")].add(Dataset("s3://bucket/my-task"),
 extra={"k": "v"})
         # This line won't emit an additional dataset event as the dataset and 
extra are the same as the previous line.
-        outlet_events["my-task-outputs-2"].add(Dataset("s3://bucket/my-task"), 
extra={"k": "v"})
+        
outlet_events[DatasetAlias("my-task-outputs-2")].add(Dataset("s3://bucket/my-task"),
 extra={"k": "v"})
         # This line will emit an additional dataset event as the extra is 
different.
-        outlet_events["my-task-outputs-3"].add(Dataset("s3://bucket/my-task"), 
extra={"k2": "v2"})
+        
outlet_events[DatasetAlias("my-task-outputs-3")].add(Dataset("s3://bucket/my-task"),
 extra={"k2": "v2"})
 
 Scheduling based on dataset aliases
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -487,7 +487,7 @@ The dataset alias is resolved to the datasets during DAG 
parsing. Thus, if the "
 
         @task(outlets=[DatasetAlias("example-alias")])
         def produce_dataset_events(*, outlet_events):
-            outlet_events["example-alias"].add(Dataset("s3://bucket/my-task"))
+            
outlet_events[DatasetAlias("example-alias")].add(Dataset("s3://bucket/my-task"))
 
 
     with DAG(dag_id="dataset-consumer", 
schedule=Dataset("s3://bucket/my-task")):
@@ -511,7 +511,9 @@ As mentioned in :ref:`Fetching information from previously 
emitted dataset event
 
         @task(outlets=[DatasetAlias("example-alias")])
         def produce_dataset_events(*, outlet_events):
-            outlet_events["example-alias"].add(Dataset("s3://bucket/my-task"), 
extra={"row_count": 1})
+            outlet_events[DatasetAlias("example-alias")].add(
+                Dataset("s3://bucket/my-task"), extra={"row_count": 1}
+            )
 
 
     with DAG(dag_id="dataset-alias-consumer", schedule=None):
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 468dc2c9300..b5dcb43de26 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2411,7 +2411,7 @@ class TestTaskInstance:
 
             @task(outlets=Dataset("test_outlet_dataset_extra_1"))
             def write1(*, outlet_events):
-                outlet_events["test_outlet_dataset_extra_1"].extra = {"foo": 
"bar"}
+                outlet_events[Dataset("test_outlet_dataset_extra_1")].extra = 
{"foo": "bar"}
 
             write1()
 
@@ -2453,8 +2453,8 @@ class TestTaskInstance:
 
             @task(outlets=Dataset("test_outlet_dataset_extra"))
             def write(*, outlet_events):
-                outlet_events["test_outlet_dataset_extra"].extra = {"one": 1}
-                outlet_events["different_uri"].extra = {"foo": "bar"}  # Will 
be silently dropped.
+                outlet_events[Dataset("test_outlet_dataset_extra")].extra = 
{"one": 1}
+                outlet_events[Dataset("different_uri")].extra = {"foo": "bar"} 
 # Will be silently dropped.
 
             write()
 
@@ -2722,22 +2722,22 @@ class TestTaskInstance:
 
             @task(outlets=Dataset("test_inlet_dataset_extra"))
             def write(*, ti, outlet_events):
-                outlet_events["test_inlet_dataset_extra"].extra = {"from": 
ti.task_id}
+                outlet_events[Dataset("test_inlet_dataset_extra")].extra = 
{"from": ti.task_id}
 
             @task(inlets=Dataset("test_inlet_dataset_extra"))
             def read(*, inlet_events):
-                second_event = inlet_events["test_inlet_dataset_extra"][1]
+                second_event = 
inlet_events[Dataset("test_inlet_dataset_extra")][1]
                 assert second_event.uri == "test_inlet_dataset_extra"
                 assert second_event.extra == {"from": "write2"}
 
-                last_event = inlet_events["test_inlet_dataset_extra"][-1]
+                last_event = 
inlet_events[Dataset("test_inlet_dataset_extra")][-1]
                 assert last_event.uri == "test_inlet_dataset_extra"
                 assert last_event.extra == {"from": "write3"}
 
                 with pytest.raises(KeyError):
-                    inlet_events["does_not_exist"]
+                    inlet_events[Dataset("does_not_exist")]
                 with pytest.raises(IndexError):
-                    inlet_events["test_inlet_dataset_extra"][5]
+                    inlet_events[Dataset("test_inlet_dataset_extra")][5]
 
                 # TODO: Support slices.
 
@@ -2798,7 +2798,7 @@ class TestTaskInstance:
                 assert last_event.extra == {"from": "write3"}
 
                 with pytest.raises(KeyError):
-                    inlet_events["does_not_exist"]
+                    inlet_events[Dataset("does_not_exist")]
                 with pytest.raises(KeyError):
                     inlet_events[DatasetAlias("does_not_exist")]
                 with pytest.raises(IndexError):

Reply via email to