ephraimbuddy commented on code in PR #57360:
URL: https://github.com/apache/airflow/pull/57360#discussion_r2483903434


##########
airflow-core/tests/unit/models/test_dag.py:
##########
@@ -1187,6 +1188,24 @@ def test_create_dagrun_job_id_is_set(self, 
testing_dag_bundle):
         )
         assert dr.creating_job_id == job_id
 
+    @pytest.mark.parametrize(["partition_key"], [[None], ["my-key"], [123]])
+    def test_create_dagrun_partition_key(self, partition_key, 
testing_dag_bundle):
+        dag = DAG(dag_id="test_create_dagrun_partition_key", schedule=None)
+        scheduler_dag = sync_dag_to_db(dag)

Review Comment:
   ```suggestion
       def test_create_dagrun_partition_key(self, partition_key, dag_maker):
           with dag_maker("test_create_dagrun_partition_key") as schedular_dag:
   ```
   nit: this should work as well, eliminating testing_dag_bunlde and 
sync_dag_to_db steps



##########
airflow-core/src/airflow/migrations/versions/0092_3_2_0_add_partition_key_field.py:
##########
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add partition_key field.
+
+Revision ID: 665854ef0536
+Revises: b87d2135fa50
+Create Date: 2025-10-14 10:27:04.345130
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+from airflow.utils.sqlalchemy import UtcDateTime
+
+revision = "665854ef0536"
+down_revision = "b87d2135fa50"
+branch_labels = None
+depends_on = None
+airflow_version = "3.2.0"
+
+
+def upgrade():
+    """Apply Add partition_key field."""
+    op.create_table(
+        "partitioned_asset_key_log",
+        sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
+        sa.Column("asset_id", sa.Integer(), nullable=False),
+        sa.Column("asset_event_id", sa.Integer(), nullable=False),
+        sa.Column("asset_partition_dag_run_id", sa.Integer(), nullable=False),
+        sa.Column("source_partition_key", sa.String(length=250), 
nullable=False),

Review Comment:
   If we are not using `sa.String` on purpose, maybe we should use `StringID` 
from `airflow.migrations.db_types` since it includes collations. I see that the 
ORM is using `StringID`, so probably make it uniform



##########
airflow-core/src/airflow/models/asset.py:
##########
@@ -859,3 +860,58 @@ def __repr__(self) -> str:
         ]:
             args.append(f"{attr}={getattr(self, attr)!r}")
         return f"{self.__class__.__name__}({', '.join(args)})"
+
+
+class AssetPartitionDagRun(Base):
+    """
+    Keep track of new runs of a dag run per partition key.
+
+    Think of AssetPartitionDagRun as a provisional dag run. This record is 
created
+    when there's an asset event that contributes to the creation of a dag run 
for
+    this dag_id / partition_key combo. It may need to wait for other events 
before
+    it's ready to be created though, and the scheduler will make this 
determination.
+
+    We can look up the AssetEvents that contribute to AssetPartitionDagRun 
entities
+    with the PartitionedAssetKeyLog mapping table.
+
+    Where dag_run_id is null, the dag run has not yet been created.
+    We should not allow more than one like this. But to guard against
+    an accident, we should always work on the latest one.
+    """
+
+    id: Mapped[int] = mapped_column(Integer, primary_key=True, 
autoincrement=True)
+    target_dag_id: Mapped[str | None] = mapped_column(StringID(), 
nullable=False)
+    target_dag_run_id: Mapped[str | None] = mapped_column(StringID(), 
nullable=True)

Review Comment:
   Is the target_dag_run_id string or an integer since dag_run_id is an integer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to