kyungjunleeme commented on issue #52932:
URL: https://github.com/apache/airflow/issues/52932#issuecomment-3069588649

   @charlie-liner @kgw7401 
   
   https://github.com/apache/airflow/pull/41348
   this pr is large refactoring pr
   
   ```
   TaskInstance (source) → AssetEvent is recorded → source_dag_id and 
source_run_id are stored
                                              ↓
                                source_dag_run relationship allows tracing back 
to the DagRun
   ```
   
   ```python
   class AssetEvent(Base):
       """
       A table to store assets events.
   
       :param asset_id: reference to AssetModel record
       :param extra: JSON field for arbitrary extra info
       :param source_task_id: the task_id of the TI which updated the asset
       :param source_dag_id: the dag_id of the TI which updated the asset
       :param source_run_id: the run_id of the TI which updated the asset
       :param source_map_index: the map_index of the TI which updated the asset
       :param timestamp: the time the event was logged
   
       We use relationships instead of foreign keys so that asset events are 
not deleted even
       if the foreign key object is.
       """
   
       id = Column(Integer, primary_key=True, autoincrement=True)
       asset_id = Column(Integer, nullable=False)
       extra = Column(sqlalchemy_jsonfield.JSONField(json=json), 
nullable=False, default={})
       source_task_id = Column(StringID(), nullable=True)
       source_dag_id = Column(StringID(), nullable=True)
       source_run_id = Column(StringID(), nullable=True)
       source_map_index = Column(Integer, nullable=True, 
server_default=text("-1"))
       timestamp = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
   
       __tablename__ = "asset_event"
       __table_args__ = (
           Index("idx_asset_id_timestamp", asset_id, timestamp),
           {"sqlite_autoincrement": True},  # ensures PK values not reused
       )
   
       created_dagruns = relationship(
           "DagRun",
           secondary=association_table,
           backref="consumed_asset_events",
       )
   
       source_aliases = relationship(
           "AssetAliasModel",
           secondary=asset_alias_asset_event_association_table,
           back_populates="asset_events",
       )
   
       source_task_instance = relationship(
           "TaskInstance",
           primaryjoin="""and_(
               AssetEvent.source_dag_id == foreign(TaskInstance.dag_id),
               AssetEvent.source_run_id == foreign(TaskInstance.run_id),
               AssetEvent.source_task_id == foreign(TaskInstance.task_id),
               AssetEvent.source_map_index == foreign(TaskInstance.map_index),
           )""",
           viewonly=True,
           lazy="select",
           uselist=False,
       )
       source_dag_run = relationship(
           "DagRun",
           primaryjoin="""and_(
               AssetEvent.source_dag_id == foreign(DagRun.dag_id),
               AssetEvent.source_run_id == foreign(DagRun.run_id),
           )""",
           viewonly=True,
           lazy="select",
           uselist=False,
       )
       asset = relationship(
           AssetModel,
           primaryjoin="AssetEvent.asset_id == foreign(AssetModel.id)",
           viewonly=True,
           lazy="select",
           uselist=False,
       )
   ```
   
   it can be helpful.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to