dstandish commented on code in PR #57360:
URL: https://github.com/apache/airflow/pull/57360#discussion_r2495520257
##########
airflow-core/src/airflow/models/asset.py:
##########
@@ -859,3 +860,58 @@ def __repr__(self) -> str:
]:
args.append(f"{attr}={getattr(self, attr)!r}")
return f"{self.__class__.__name__}({', '.join(args)})"
+
+
+class AssetPartitionDagRun(Base):
+ """
+ Keep track of new runs of a dag run per partition key.
+
+ Think of AssetPartitionDagRun as a provisional dag run. This record is
created
+ when there's an asset event that contributes to the creation of a dag run
for
+ this dag_id / partition_key combo. It may need to wait for other events
before
+ it's ready to be created though, and the scheduler will make this
determination.
+
+ We can look up the AssetEvents that contribute to AssetPartitionDagRun
entities
+ with the PartitionedAssetKeyLog mapping table.
+
+ Where dag_run_id is null, the dag run has not yet been created.
+ We should not allow more than one like this. But to guard against
+ an accident, we should always work on the latest one.
+ """
+
+ id: Mapped[int] = mapped_column(Integer, primary_key=True,
autoincrement=True)
+ target_dag_id: Mapped[str | None] = mapped_column(StringID(),
nullable=False)
+ target_dag_run_id: Mapped[int | None] = mapped_column(Integer(),
nullable=True)
Review Comment:
sure, i can do that
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]