BasPH commented on code in PR #30292:
URL: https://github.com/apache/airflow/pull/30292#discussion_r1148323631


##########
airflow/operators/trigger_dagrun.py:
##########
@@ -40,9 +42,27 @@
 
 
 if TYPE_CHECKING:
+    from sqlalchemy.orm.session import Session
+
     from airflow.models.taskinstance import TaskInstanceKey
 
 
+def _parse_execution_date(date):

Review Comment:
   Let's add type hints to indicate expected input & output types.



##########
airflow/operators/trigger_dagrun.py:
##########
@@ -40,9 +42,27 @@
 
 
 if TYPE_CHECKING:
+    from sqlalchemy.orm.session import Session
+
     from airflow.models.taskinstance import TaskInstanceKey
 
 
+def _parse_execution_date(date):
+    if isinstance(date, datetime.datetime):
+        return date
+    elif isinstance(date, str):
+        return timezone.parse(date)
+    else:
+        return timezone.utcnow()
+
+
+def _get_run_id(run_id, parsed_execution_date):

Review Comment:
   Let's add type hints to indicate expected input & output types.



##########
airflow/operators/trigger_dagrun.py:
##########
@@ -185,3 +215,40 @@ def execute(self, context: Context):
                 if state in self.allowed_states:
                     self.log.info("%s finished with allowed state %s", 
self.trigger_dag_id, state)
                     return
+
+    @provide_session
+    def execute_complete(self, context: Context, session: "Session", **kwargs):
+        parsed_execution_date = _parse_execution_date(self.execution_date)
+
+        dag_run = (
+            session.query(DagRun)
+            .filter(DagRun.dag_id == self.trigger_dag_id, 
DagRun.execution_date == parsed_execution_date)
+            .all()
+        )
+
+        if len(dag_run) > 1:
+            raise AirflowException(
+                f"{self.trigger_dag_id} with execution_data of 
{self.execution_date} appears too"
+                " many times, {len(dag_run)}"
+            )
+
+        dag_run = dag_run[0]
+
+        if not dag_run:
+            raise AirflowException(
+                f"{self.trigger_dag_id} with execution_data of 
{self.execution_date} doesn't"
+                " appear to exist"
+            )
+
+        state = dag_run.state
+
+        if state in self.failed_states:
+            raise AirflowException(f"{self.trigger_dag_id} failed with failed 
states {state}")
+        if state in self.allowed_states:
+            self.log.info("%s finished with allowed state %s", 
self.trigger_dag_id, state)
+            return
+
+        raise AirflowException(
+            f"{self.trigger_dag_id} return {state} which is not in 
{self.failed_states}"
+            " or {self.allowed_states}"

Review Comment:
   ```suggestion
               f" or {self.allowed_states}"
   ```



##########
airflow/operators/trigger_dagrun.py:
##########
@@ -185,3 +215,40 @@ def execute(self, context: Context):
                 if state in self.allowed_states:
                     self.log.info("%s finished with allowed state %s", 
self.trigger_dag_id, state)
                     return
+
+    @provide_session
+    def execute_complete(self, context: Context, session: "Session", **kwargs):
+        parsed_execution_date = _parse_execution_date(self.execution_date)
+
+        dag_run = (
+            session.query(DagRun)
+            .filter(DagRun.dag_id == self.trigger_dag_id, 
DagRun.execution_date == parsed_execution_date)
+            .all()
+        )
+
+        if len(dag_run) > 1:
+            raise AirflowException(
+                f"{self.trigger_dag_id} with execution_data of 
{self.execution_date} appears too"
+                " many times, {len(dag_run)}"
+            )
+
+        dag_run = dag_run[0]
+
+        if not dag_run:
+            raise AirflowException(
+                f"{self.trigger_dag_id} with execution_data of 
{self.execution_date} doesn't"
+                " appear to exist"
+            )
+
+        state = dag_run.state
+
+        if state in self.failed_states:
+            raise AirflowException(f"{self.trigger_dag_id} failed with failed 
states {state}")

Review Comment:
   ```suggestion
               raise AirflowException(f"{self.trigger_dag_id} failed with 
failed state {state}")
   ```



##########
airflow/operators/trigger_dagrun.py:
##########
@@ -185,3 +215,40 @@ def execute(self, context: Context):
                 if state in self.allowed_states:
                     self.log.info("%s finished with allowed state %s", 
self.trigger_dag_id, state)
                     return
+
+    @provide_session
+    def execute_complete(self, context: Context, session: "Session", **kwargs):
+        parsed_execution_date = _parse_execution_date(self.execution_date)
+
+        dag_run = (
+            session.query(DagRun)
+            .filter(DagRun.dag_id == self.trigger_dag_id, 
DagRun.execution_date == parsed_execution_date)
+            .all()
+        )
+
+        if len(dag_run) > 1:
+            raise AirflowException(
+                f"{self.trigger_dag_id} with execution_data of 
{self.execution_date} appears too"
+                " many times, {len(dag_run)}"
+            )
+
+        dag_run = dag_run[0]
+
+        if not dag_run:
+            raise AirflowException(
+                f"{self.trigger_dag_id} with execution_data of 
{self.execution_date} doesn't"
+                " appear to exist"

Review Comment:
   ```suggestion
                   No DAG run found for DAG f"{self.trigger_dag_id} and 
execution date {self.execution_date}"
   ```
   
   Would make the message more explicit.



##########
airflow/operators/trigger_dagrun.py:
##########
@@ -185,3 +215,40 @@ def execute(self, context: Context):
                 if state in self.allowed_states:
                     self.log.info("%s finished with allowed state %s", 
self.trigger_dag_id, state)
                     return
+
+    @provide_session
+    def execute_complete(self, context: Context, session: "Session", **kwargs):
+        parsed_execution_date = _parse_execution_date(self.execution_date)
+
+        dag_run = (
+            session.query(DagRun)
+            .filter(DagRun.dag_id == self.trigger_dag_id, 
DagRun.execution_date == parsed_execution_date)
+            .all()
+        )
+
+        if len(dag_run) > 1:
+            raise AirflowException(
+                f"{self.trigger_dag_id} with execution_data of 
{self.execution_date} appears too"
+                " many times, {len(dag_run)}"
+            )

Review Comment:
   ```suggestion
               raise AirflowException(
                   f"Detected duplicate DAG run ({self.trigger_dag_id}) and 
execution date "
                   f"({self.execution_date}), found {len(dag_run)} records"
               )
   ```
   
   Fix typo (date instead of data) and would make the message more explicit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to