ephraimbuddy commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r685764959



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -471,6 +476,14 @@ def _per_task_process(key, ti, session=None):
                             ti_status.running.pop(key)
                         # Reset the failed task in backfill to scheduled state
                         ti.set_state(State.SCHEDULED, session=session)
+                elif self.rerun_succeeded_tasks and ti.state == State.SUCCESS:
+                    # Rerun succeeded tasks
+                    self.log.error("Task instance {ti} with state 
{state}".format(ti=ti,
+                                                                   
state=ti.state))

Review comment:
       ```suggestion
                       self.log.info("Task instance %s with state %s 
succeeded", ti,ti.state)
   ```
   This should not be an error log, right?

##########
File path: tests/jobs/test_backfill_job.py
##########
@@ -612,6 +640,7 @@ def test_backfill_rerun_upstream_failed_tasks(self, 
dag_maker):
             op1.set_upstream(op2)
         dag_maker.create_dagrun()
 
+        dag.clear()

Review comment:
       ```suggestion
   ```
   ??

##########
File path: tests/jobs/test_backfill_job.py
##########
@@ -719,7 +748,7 @@ def test_backfill_retry_always_failed_task(self, dag_maker):
             start_date=DEFAULT_DATE,
             end_date=DEFAULT_DATE,
         )
-        with pytest.raises(BackfillUnfinished):
+        with self.assertRaises(BackfillUnfinished):

Review comment:
       ```suggestion
           with pytest.raises((BackfillUnfinished):
   ```
   We no longer use unittest model on these tests, so this won't work

##########
File path: tests/jobs/test_backfill_job.py
##########
@@ -740,6 +769,8 @@ def test_backfill_ordered_concurrent_execute(self, 
dag_maker):
             op3.set_downstream(op4)
         dag_maker.create_dagrun()
 
+        dag.clear()
+

Review comment:
       ```suggestion
   ```

##########
File path: tests/jobs/test_backfill_job.py
##########
@@ -477,6 +471,8 @@ def test_backfill_pool_not_found(self, dag_maker):
         except AirflowException:
             return
 
+        self.fail()

Review comment:
       ```suggestion
   ```
   Any reason for adding this?

##########
File path: tests/jobs/test_backfill_job.py
##########
@@ -287,10 +284,7 @@ def test_backfill_conf(self, dag_maker):
         )
         job.run()
 
-        # We ignore the first dag_run created by fixture
-        dr = DagRun.find(
-            dag_id='test_backfill_conf', execution_start_date=DEFAULT_DATE + 
datetime.timedelta(days=1)
-        )
+        dr = DagRun.find(dag_id='test_backfill_conf')

Review comment:
       ```suggestion
           # We ignore the first dag_run created by fixture
           dr = DagRun.find(
               dag_id='test_backfill_conf', execution_start_date=DEFAULT_DATE + 
datetime.timedelta(days=1)
           )
   ```
   Does this affect your change?

##########
File path: tests/jobs/test_backfill_job.py
##########
@@ -929,8 +960,11 @@ def run_backfill(cond):
                     dag_maker.create_dagrun(
                         # Existing dagrun that is not within the backfill range
                         run_id=run_id,
+                        state=State.RUNNING,
                         execution_date=DEFAULT_DATE + 
datetime.timedelta(hours=1),
+                        start_date=DEFAULT_DATE,

Review comment:
       dag_maker has a default state=`State.RUNNING` and 
start_date=DEFAULT_DATE. Why are we adding this? Less code is better...
   I don't think you should change this test

##########
File path: tests/jobs/test_backfill_job.py
##########
@@ -37,7 +37,7 @@
     TaskConcurrencyLimitReached,
 )
 from airflow.jobs.backfill_job import BackfillJob
-from airflow.models import DagBag, Pool, TaskInstance as TI
+from airflow.models import DAG, DagBag, Pool, TaskInstance as TI

Review comment:
       ```suggestion
   from airflow.models import DagBag, Pool, TaskInstance as TI
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to