uranusjr opened a new issue, #37810:
URL: https://github.com/apache/airflow/issues/37810

   ### Description
   
   To eventually support the construct and UI we’re aiming for in assets, we 
need to attach metadata to the actual data, not the task that produces it, nor 
the location it is written to.
   
   In the task-based web UI, we can show those attached metadata in the task 
that emits the dataset event, to give an impression the metadata is directly 
associated to the task. In the implementation, however, the metadata would only 
be associated to the dataset, and only indirectly related to the task by the 
fact that the task emits the event.
   
   ### Use case/motivation
   
   An Airflow task generating data may want to attach information about it. 
Airflow does not currently provide a good interface for this. The only thing 
that resembles such feature is to attach and extra dict on `Dataset` like this:
   
   ```python
   @task(outlets=[Dataset("s3://bucket/key", extra={"xx": "yy"})])
   def producer():
       # Write to the file on S3...
   ```
   
   This is however quite limiting. It may be good enough for static information 
such as *who owns this data*, but not for information that is only known at 
runtime, to provide additional context to the generated data.
   
   ### Store runtime-populated extras on `DatasetEvent`
   
   When a Dataset event is emitted, the corresponding `DatasetEvent` model in 
the database already has a field call `extra`. This is however currently not 
populated when the event is generated from a task outlet (only when it’s 
created via the REST API).
   
   [A previous design discussion contains the following comment from 
@blag:](https://github.com/apache/airflow/issues/35297#issuecomment-1843720880)
   
   > The only intent that I ever had (and note: I was not the author of the 
dataset AIP) regarding the `extra` fields for datasets and dataset events, was 
to allow third party integrations to easily store information from external 
systems that wasn't captured in Airflow's database schema, eg: to do so without 
forking the schema migrations.
   
   and
   
   > But if all you are looking to do is pass information between task 
instances in the same DAG run or between task instances in different DAG runs, 
I believe the Airflow mechanism to do this is XComs, even with data-aware 
Airflow.
   
   However, I would argue that user code in an Airflow DAG should also have the 
ability to store custom information. While the information is readable in 
downstream tasks—thus technically is a mechanism to pass data between tasks—the 
main intention behind the design is instead to annotate the generated data, and 
does not go against the original design.
   
   ### Provide extras at runtime
   
   The task function (either `@task`-decorated, or a classic operator’s 
`execute`) will be able to attach values to a Dataset URI in the function. This 
is done by an accessor proxy under the key `dataset_events`, so in the task 
function you can:
   
   ```python
   @task(outlets=[Dataset("s3://bucket/key")])
   def producer(*, dataset_events: dict[str, DatasetEventProxy]):
       dataset_events["s3://bucket/key"].extra["desc"] = "foo bar"
   ```
   
   After the task function’s execution, the extras provided dynamically are 
written to the `DatasetEvent` entry generated for the Dataset. Do note 
specially this is entirely distinct from `extra` on `Dataset`.
   
   Instead of using URI, you can use the dataset object directly to access the 
proxy:
   
   ```python
   target = Dataset("s3://bucket/key")
   
   @task(outlets=[target])
   def producer(*, dataset_events: dict[str, DatasetEventProxy]):
       dataset_events[target].extra["desc"] = "foo bar"
   ```
   
   Example using the context dict instead:
   
   ```python
   @task(outlets=[target])
   def producer():
       context = get_current_context()
       context["dataset_events"][target].extra["desc"] = "foo bar"
   ```
   
   With a classic operator:
   
   ```python
   class MyOperator(BaseOperator):
       def execute(self, context: Context) -> None:
           super().execute(context)
           context["dataset_events"][target].extra["desc"] = "foo bar"
   ```
   
   ### Show dataset event extras in web UI
   
   Both dataset and dataset event extras currently have zero visibility to 
users in the web UI. This is somewhat acceptable for datasets, where the extra 
dict is static, but is a problem for dynamically generated values. Additional 
components should be added to the web UI to display extras emitted by a dataset 
event.
   
   An obvious first addition would be to add a tables in the task instance 
panel in the Grid view when the task instance emits dataset events with extras. 
Quick UI mock:
   
   
![dataset-extra-view-mock.png](https://prod-files-secure.s3.us-west-2.amazonaws.com/3f2d561c-a102-4577-8be3-cc15bdd606a8/02f0d565-fe3b-4ffa-863c-fc2717444c3a/dataset-extra-view-mock.png)
   
   Each key and value will simply be stringified to be displayed in the table. 
This should be enough for simple data since the extra dict currently need to be 
JSON-compatible. We can discuss richer data (similar to how Jupyter displays a 
DataFrame), and putting this information in other places (e.g. in the Dataset 
view) in the future.
   
   ### Related issues
   
   https://github.com/apache/airflow/issues/35297
   https://github.com/apache/airflow/pull/36075
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to