potiuk commented on PR #32991: URL: https://github.com/apache/airflow/pull/32991#issuecomment-1666667269
OK. More food for thoughts: https://github.com/apache/airflow/actions/runs/5773634484/job/15649876570?pr=32991#step:6:10818 The test failed there but with a different error now after the rollback. So the rollback did not **workaround** it this time but it actually skipped the job. Attempting to simply add addiotional retries for the failed transaction. Not perfect, not beautiful but 🤷 ``` __________________ TestDaskExecutor.test_backfill_integration __________________ self = <tests.providers.daskexecutor.test_dask_executor.TestDaskExecutor object at 0x7fd56b88afa0> @pytest.mark.execution_timeout(180) def test_backfill_integration(self): """ Test that DaskExecutor can be used to backfill example dags """ dag = self.dagbag.get_dag("example_bash_operator") job = Job( executor=DaskExecutor(cluster_address=self.cluster.scheduler_address), ) job_runner = BackfillJobRunner( job=job, dag=dag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_first_depends_on_past=True, ) > run_job(job=job, execute_callable=job_runner._execute) tests/providers/daskexecutor/test_dask_executor.py:124: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ airflow/utils/session.py:77: in wrapper return func(*args, session=session, **kwargs) airflow/jobs/job.py:280: in run_job return execute_job(job, execute_callable=execute_callable) airflow/jobs/job.py:309: in execute_job ret = execute_callable() airflow/utils/session.py:77: in wrapper return func(*args, session=session, **kwargs) "Backfill cannot be created for DagRun %s in %s, as there's already %s in a RUNNING " "state.", run.run_id, run.execution_date.strftime("%Y-%m-%dT%H:%M:%S"), run.run_type, ) self.log.error( "Changing DagRun into BACKFILL would cause scheduler to lose track of executing " "tasks. Not changing DagRun type into BACKFILL, and trying insert another DagRun into " "database would cause database constraint violation for dag_id + execution_date " "combination. Please adjust backfill dates or wait for this DagRun to finish.", ) return # picklin' pickle_id = None executor_class, _ = ExecutorLoader.import_default_executor_cls() if not self.donot_pickle and executor_class.supports_pickling: pickle = DagPickle(self.dag) session.add(pickle) session.commit() pickle_id = pickle.id executor = self.job.executor executor.job_id = self.job.id executor.start() ti_status.total_runs = len(dagrun_infos) # total dag runs in backfill try: remaining_dates = ti_status.total_runs while remaining_dates > 0: dagrun_infos_to_process = [ dagrun_info for dagrun_info in dagrun_infos if dagrun_info.logical_date not in ti_status.executed_dag_run_dates ] self._execute_dagruns( dagrun_infos=dagrun_infos_to_process, ti_status=ti_status, executor=executor, pickle_id=pickle_id, start_date=start_date, session=session, ) remaining_dates = ti_status.total_runs - len(ti_status.executed_dag_run_dates) err = "".join(self._collect_errors(ti_status=ti_status, session=session)) if err: if not self.continue_on_failures or ti_status.deadlocked: > raise BackfillUnfinished(err, ti_status) E airflow.exceptions.BackfillUnfinished: Some task instances failed: E DAG ID Task ID Run ID Try number E --------------------- -------------- ----------------------------------- ------------ E example_bash_operator run_this_last backfill__2017-01-01T00:00:00+00:00 1 E example_bash_operator this_will_skip backfill__2017-01-01T00:00:00+00:00 1 airflow/jobs/backfill_job_runner.py:937: BackfillUnfinished ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
