kyungjunleeme commented on issue #52932: URL: https://github.com/apache/airflow/issues/52932#issuecomment-3069588649
@charlie-liner @kgw7401 https://github.com/apache/airflow/pull/41348 this pr is large refactoring pr ``` TaskInstance (source) → AssetEvent is recorded → source_dag_id and source_run_id are stored ↓ source_dag_run relationship allows tracing back to the DagRun ``` ```python class AssetEvent(Base): """ A table to store assets events. :param asset_id: reference to AssetModel record :param extra: JSON field for arbitrary extra info :param source_task_id: the task_id of the TI which updated the asset :param source_dag_id: the dag_id of the TI which updated the asset :param source_run_id: the run_id of the TI which updated the asset :param source_map_index: the map_index of the TI which updated the asset :param timestamp: the time the event was logged We use relationships instead of foreign keys so that asset events are not deleted even if the foreign key object is. """ id = Column(Integer, primary_key=True, autoincrement=True) asset_id = Column(Integer, nullable=False) extra = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}) source_task_id = Column(StringID(), nullable=True) source_dag_id = Column(StringID(), nullable=True) source_run_id = Column(StringID(), nullable=True) source_map_index = Column(Integer, nullable=True, server_default=text("-1")) timestamp = Column(UtcDateTime, default=timezone.utcnow, nullable=False) __tablename__ = "asset_event" __table_args__ = ( Index("idx_asset_id_timestamp", asset_id, timestamp), {"sqlite_autoincrement": True}, # ensures PK values not reused ) created_dagruns = relationship( "DagRun", secondary=association_table, backref="consumed_asset_events", ) source_aliases = relationship( "AssetAliasModel", secondary=asset_alias_asset_event_association_table, back_populates="asset_events", ) source_task_instance = relationship( "TaskInstance", primaryjoin="""and_( AssetEvent.source_dag_id == foreign(TaskInstance.dag_id), AssetEvent.source_run_id == foreign(TaskInstance.run_id), AssetEvent.source_task_id == foreign(TaskInstance.task_id), AssetEvent.source_map_index == foreign(TaskInstance.map_index), )""", viewonly=True, lazy="select", uselist=False, ) source_dag_run = relationship( "DagRun", primaryjoin="""and_( AssetEvent.source_dag_id == foreign(DagRun.dag_id), AssetEvent.source_run_id == foreign(DagRun.run_id), )""", viewonly=True, lazy="select", uselist=False, ) asset = relationship( AssetModel, primaryjoin="AssetEvent.asset_id == foreign(AssetModel.id)", viewonly=True, lazy="select", uselist=False, ) ``` it can be helpful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
