potiuk commented on PR #32991:
URL: https://github.com/apache/airflow/pull/32991#issuecomment-1666667269

   OK. More food for thoughts: 
   
   
https://github.com/apache/airflow/actions/runs/5773634484/job/15649876570?pr=32991#step:6:10818
   
   The test failed there but with a different error now after the rollback.
   
   So the rollback did not **workaround** it this time but it actually skipped 
the job.
   
   
   Attempting to simply add addiotional retries for the failed transaction. Not 
perfect, not beautiful but 🤷 
   
   
   ```
   __________________ TestDaskExecutor.test_backfill_integration 
__________________
   
   self = <tests.providers.daskexecutor.test_dask_executor.TestDaskExecutor 
object at 0x7fd56b88afa0>
   
       @pytest.mark.execution_timeout(180)
       def test_backfill_integration(self):
           """
           Test that DaskExecutor can be used to backfill example dags
           """
           dag = self.dagbag.get_dag("example_bash_operator")
       
           job = Job(
               
executor=DaskExecutor(cluster_address=self.cluster.scheduler_address),
           )
           job_runner = BackfillJobRunner(
               job=job,
               dag=dag,
               start_date=DEFAULT_DATE,
               end_date=DEFAULT_DATE,
               ignore_first_depends_on_past=True,
           )
   >       run_job(job=job, execute_callable=job_runner._execute)
   
   tests/providers/daskexecutor/test_dask_executor.py:124: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   airflow/utils/session.py:77: in wrapper
       return func(*args, session=session, **kwargs)
   airflow/jobs/job.py:280: in run_job
       return execute_job(job, execute_callable=execute_callable)
   airflow/jobs/job.py:309: in execute_job
       ret = execute_callable()
   airflow/utils/session.py:77: in wrapper
       return func(*args, session=session, **kwargs)
                       "Backfill cannot be created for DagRun %s in %s, as 
there's already %s in a RUNNING "
                       "state.",
                       run.run_id,
                       run.execution_date.strftime("%Y-%m-%dT%H:%M:%S"),
                       run.run_type,
                   )
               self.log.error(
                   "Changing DagRun into BACKFILL would cause scheduler to lose 
track of executing "
                   "tasks. Not changing DagRun type into BACKFILL, and trying 
insert another DagRun into "
                   "database would cause database constraint violation for 
dag_id + execution_date "
                   "combination. Please adjust backfill dates or wait for this 
DagRun to finish.",
               )
               return
           # picklin'
           pickle_id = None
       
           executor_class, _ = ExecutorLoader.import_default_executor_cls()
       
           if not self.donot_pickle and executor_class.supports_pickling:
               pickle = DagPickle(self.dag)
               session.add(pickle)
               session.commit()
               pickle_id = pickle.id
       
           executor = self.job.executor
           executor.job_id = self.job.id
           executor.start()
       
           ti_status.total_runs = len(dagrun_infos)  # total dag runs in 
backfill
       
           try:
               remaining_dates = ti_status.total_runs
               while remaining_dates > 0:
                   dagrun_infos_to_process = [
                       dagrun_info
                       for dagrun_info in dagrun_infos
                       if dagrun_info.logical_date not in 
ti_status.executed_dag_run_dates
                   ]
                   self._execute_dagruns(
                       dagrun_infos=dagrun_infos_to_process,
                       ti_status=ti_status,
                       executor=executor,
                       pickle_id=pickle_id,
                       start_date=start_date,
                       session=session,
                   )
       
                   remaining_dates = ti_status.total_runs - 
len(ti_status.executed_dag_run_dates)
                   err = "".join(self._collect_errors(ti_status=ti_status, 
session=session))
                   if err:
                       if not self.continue_on_failures or ti_status.deadlocked:
   >                       raise BackfillUnfinished(err, ti_status)
   E                       airflow.exceptions.BackfillUnfinished: Some task 
instances failed:
   E                       DAG ID                 Task ID         Run ID        
                         Try number
   E                       ---------------------  --------------  
-----------------------------------  ------------
   E                       example_bash_operator  run_this_last   
backfill__2017-01-01T00:00:00+00:00             1
   E                       example_bash_operator  this_will_skip  
backfill__2017-01-01T00:00:00+00:00             1
   
   airflow/jobs/backfill_job_runner.py:937: BackfillUnfinished
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to