jedcunningham commented on code in PR #25086:
URL: https://github.com/apache/airflow/pull/25086#discussion_r922222894


##########
airflow/models/taskinstance.py:
##########
@@ -1518,7 +1518,8 @@ def _run_raw_task(
         if not test_mode:
             session.add(Log(self.state, self))
             session.merge(self)
-            self._create_dataset_dag_run_queue_records(session=session)
+            if self.state != State.SKIPPED:

Review Comment:
   Wait, should we test if it's in State.SUCCESS instead?



##########
airflow/models/taskinstance.py:
##########
@@ -1518,7 +1518,8 @@ def _run_raw_task(
         if not test_mode:
             session.add(Log(self.state, self))
             session.merge(self)
-            self._create_dataset_dag_run_queue_records(session=session)
+            if self.state != State.SKIPPED:

Review Comment:
   Also, you should be using `TaskInstanceState` instead of `State`.



##########
tests/models/test_taskinstance.py:
##########
@@ -1521,6 +1521,46 @@ def test_outlet_datasets(self, create_task_instance):
             .count()
         ) == 1
 
+    def test_outlet_datasets_skipped(self, create_task_instance):
+        """
+        Verify that when we have an outlet dataset on a task, and the task
+        is skipped, a DatasetDagRunQueue is not logged, and a DatasetEvent is
+        not generated
+        """
+        from airflow.example_dags import example_datasets
+        from airflow.example_dags.example_datasets import dag7, dag8
+
+        session = settings.Session()
+        dagbag = DagBag(dag_folder=example_datasets.__file__)
+        dagbag.collect_dags(only_if_updated=False, safe_mode=False)
+        dagbag.sync_to_db(session=session)
+        run_id = str(uuid4())
+        dr = DagRun(dag7.dag_id, run_id=run_id, run_type='anything')
+        session.merge(dr)
+        task = dag7.get_task('skip_task')
+        ti = TaskInstance(task, run_id=run_id)
+        session.merge(ti)
+        session.commit()
+        ti._run_raw_task()
+        ti.refresh_from_db()
+        assert ti.state == State.SKIPPED
+
+        # check that no queue records exist for dataset 8
+        assert (

Review Comment:
   We probably want to make sure the table is empty instead. If we need to be 
this selective on the queries, we should get 
[clear_db_datasets](https://github.com/apache/airflow/blob/4efb1884e5a86ec65a58890e290fd89822e65c05/tests/test_utils/db.py#L56)
 in the mix (soon it'll also purge the events too).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to