potiuk commented on a change in pull request #20975: URL: https://github.com/apache/airflow/pull/20975#discussion_r798637320
########## File path: airflow/models/xcom.py ########## @@ -43,43 +44,46 @@ MAX_XCOM_SIZE = 49344 XCOM_RETURN_KEY = 'return_value' -# Work around 'airflow task test' generating a temporary in-memory DAG run -# without storing it in the database. To avoid interfering with actual XCom -# entries but still behave _somewhat_ consistently, we store XCom to a distant -# time in the future. Eventually we want to migrate XCom's primary to use run_id -# instead, so execution_date can just be None for this case. +# Stand-in value for 'airflow task test' generating a temporary in-memory DAG +# run without storing it in the database. IN_MEMORY_DAGRUN_ID = "__airflow_in_memory_dagrun__" -# This is the largest possible value we can store in MySQL. -# https://dev.mysql.com/doc/refman/5.7/en/datetime.html -_DISTANT_FUTURE = datetime.datetime(2038, 1, 19, 3, 14, 7, tzinfo=timezone.utc) - class BaseXCom(Base, LoggingMixin): """Base class for XCom objects.""" __tablename__ = "xcom" - key = Column(String(512, **COLLATION_ARGS), primary_key=True) + dagrun_id = Column(Integer(), nullable=False, primary_key=True) + task_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False, primary_key=True) + key = Column(String(512, **COLLATION_ARGS), nullable=False, primary_key=True) + + # Denormalized for easier lookup. + dag_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False) + run_id = Column(String(ID_LEN, **COLLATION_ARGS), nullable=False) + value = Column(LargeBinary) timestamp = Column(UtcDateTime, default=timezone.utcnow, nullable=False) - execution_date = Column(UtcDateTime, primary_key=True) - - # source information - task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True) - dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True) - # For _now_, we link this via execution_date, in 2.3 we will migrate this table to use run_id too dag_run = relationship( "DagRun", primaryjoin="""and_( BaseXCom.dag_id == foreign(DagRun.dag_id), - BaseXCom.execution_date == foreign(DagRun.execution_date) + BaseXCom.run_id == foreign(DagRun.run_id), )""", uselist=False, + lazy="joined", passive_deletes="all", ) - run_id = association_proxy("dag_run", "run_id") + execution_date = association_proxy("dag_run", "execution_date") + + __table_args__ = ( + # Ideally we should create a unique index over (key, dag_id, task_id, run_id), + # but it goes over MySQL's index length limit. So we instead create indexes + # separately, and enforce uniqueness with DagRun.id instead. Review comment: Well... Elephant in the room again :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
