ephraimbuddy commented on code in PR #55216:
URL: https://github.com/apache/airflow/pull/55216#discussion_r2319264680


##########
airflow-core/src/airflow/models/trigger.py:
##########
@@ -390,6 +390,142 @@ def get_sorted_triggers(cls, capacity: int, 
alive_triggerer_ids: list[int] | Sel
         return result
 
 
+class TriggerWatermark(Base):
+
+    __tablename__ = "trigger_watermark"
+
+    id = Column(Integer, primary_key=True, autoincrement=True)
+
+    # Leverage the
+    trigger_hash = Column(
+        String(length=1500).with_variant(
+            String(
+                length=1500,
+                # latin1 allows for more indexed length in mysql
+                # and this field should only be ascii chars
+                collation="latin1_general_cs",
+            ),
+            "mysql",
+        ),
+        nullable=False,
+    )
+
+    key = Column(
+        String(length=1500).with_variant(
+            String(
+                length=1500,
+                # latin1 allows for more indexed length in mysql
+                # and this field should only be ascii chars
+                collation="latin1_general_cs",
+            ),
+            "mysql",
+        ),
+        nullable=False,
+    )
+
+    # The value is stored as a string. We'll need to have some sort of a 
"serialization" method that
+    # handles this
+    value = Column(
+        String(length=1500).with_variant(
+            String(
+                length=1500,
+                # latin1 allows for more indexed length in mysql
+                # and this field should only be ascii chars
+                collation="latin1_general_cs",
+            ),
+            "mysql",
+        ),
+        nullable=False,
+    )
+
+    @classmethod
+    @provide_session
+    def set(
+        cls,
+        trigger_hash: str,
+        key: str,
+        value: Any = None,
+        session: Session = NEW_SESSION,
+    ):
+        if not trigger_hash:
+            raise ValueError(
+                "Invalid params. A ``trigger_hash`` of type ``str`` must be "
+                "passed to ``set``. This must be done in the ``set_watermark`` 
"
+                "method of your Trigger."
+            )
+
+        if not key:
+            raise ValueError(
+                "Invalid params. A ``key`` of type ``str`` must be "
+                "passed to ``set``. This must be done in the ``set_watermark`` 
"
+                "method of your Trigger."
+            )
+
+        # Query and delete the existing value
+        trigger_watermark = session.query(cls).filter(

Review Comment:
   Use Sqlalchemy 2.0+ query style



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to