[ https://issues.apache.org/jira/browse/AIRFLOW-4527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862884#comment-16862884 ]
Bharath Palaksha commented on AIRFLOW-4527: ------------------------------------------- This defect was found on 1.10.2 and it should be there on latest version too To reproduce this defect, we can hardcode DB connection error. Below is the code where i have done that to reproduce Code version - 1.10.2 File - models.py method - "_run_raw_task" hard coded exception in bold *def _run_raw_task(* self, mark_success=False, test_mode=False, job_id=None, pool=None, session=None): """ Immediately runs the task (without checking or changing db state before execution) and then sets the appropriate final state after completion and runs any post-execute callbacks. Meant to be called only after another function changes the state to running. :param mark_success: Don't run the task, mark its state as success :type mark_success: boolean :param test_mode: Doesn't record success or failure in the DB :type test_mode: boolean :param pool: specifies the pool to use to run the task instance :type pool: str """ task = self.task self.pool = pool or task.pool self.test_mode = test_mode self.refresh_from_db(session=session) self.job_id = job_id self.hostname = get_hostname() self.operator = task.__class__.__name__ context = {} try: if not mark_success: context = self.get_template_context() task_copy = copy.copy(task) . self.task = task_copy def signal_handler(signum, frame): self.log.error("Received SIGTERM. Terminating subprocesses.") task_copy.on_kill() raise AirflowException("Task received SIGTERM signal") signal.signal(signal.SIGTERM, signal_handler) # Don't clear Xcom until the task is certain to execute self.clear_xcom_data() self.render_templates() task_copy.pre_execute(context=context) # If a timeout is specified for the task, make it fail # if it goes beyond result = None if task_copy.execution_timeout: try: with timeout(int( task_copy.execution_timeout.total_seconds())): result = task_copy.execute(context=context) except AirflowTaskTimeout: task_copy.on_kill() raise else: result = task_copy.execute(context=context) # If the task returns a result, push an XCom containing it if result is not None: self.xcom_push(key=XCOM_RETURN_KEY, value=result) # TODO remove deprecated behavior in Airflow 2.0 try: task_copy.post_execute(context=context, result=result) except TypeError as e: if 'unexpected keyword argument' in str(e): warnings.warn( 'BaseOperator.post_execute() now takes two ' 'arguments, `context` and `result`, but "{}" only ' 'expected one. This behavior is deprecated and ' 'will be removed in a future version of ' 'Airflow.'.format(self.task_id), category=DeprecationWarning) task_copy.post_execute(context=context) else: raise Stats.incr('operator_successes_{}'.format( self.task.__class__.__name__), 1, 1) Stats.incr('ti_successes') self.refresh_from_db(lock_for_update=True) self.state = State.SUCCESS *raise OperationalError - # hard coded exception, it can be here when task is success - Line no 1683* except AirflowSkipException: self.refresh_from_db(lock_for_update=True) self.state = State.SKIPPED except AirflowRescheduleException as reschedule_exception: self.refresh_from_db() self._handle_reschedule(reschedule_exception, test_mode, context) return except AirflowException as e: self.refresh_from_db() *raise OperationalError - # hard coded exception, it can be here when task is a failure - Line no 1693* # for case when task is marked as success/failed externally # current behavior doesn't hit the success callback if self.state in \{State.SUCCESS, State.FAILED}: return else: self.handle_failure(e, test_mode, context) raise except (Exception, KeyboardInterrupt) as e: self.handle_failure(e, test_mode, context) raise # Success callback try: if task.on_success_callback: task.on_success_callback(context) except Exception as e3: self.log.error("Failed when executing success callback") self.log.exception(e3) # Recording SUCCESS self.end_date = timezone.utcnow() self.set_duration() if not test_mode: session.add(Log(self.state, self)) session.merge(self) session.commit() > Connection error while calling refreshfromdb() makes the task stuck in > running state > ------------------------------------------------------------------------------------ > > Key: AIRFLOW-4527 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4527 > Project: Apache Airflow > Issue Type: Bug > Components: database > Reporter: Bharath Palaksha > Assignee: Bharath Palaksha > Priority: Major > Labels: mysql > Fix For: 1.10.2 > > > {{I have setup airflow with mysql as metastore. When there is a network issue > and task fails with a network connection reset exception, airflow tries to > refresh status from db and gets a connection error - This results in task > getting stuck in running.}} > {{There is no retry for mysql connection error and it never handles the > exception}} > If worker nodes are unable to reach mysql to update task status, scheduler > node should handle this scenario and mark those tasks failed. Tasks shouldn't > be stuck in running state for ever. > > Scheduler heartbeat got an exception: (MySQLdb._exceptions.OperationalError) > (2013, "Lost connection to MySQL server at 'reading authorization packet', > system error: 104") (Background on this error at: [http://sqlalche.me/e/e3q8]) > {base_task_runner.py:101} > INFO - Job 989226: Subtask count_cust_shipped_data Traceback (most recent > call last): > {base_task_runner.py:101} > INFO - Job 989226: Subtask count_cust_shipped_data File > "/usr/local/bin/airflow", line 32, in <module> > {base_task_runner.py:101} > INFO - Job 989226: Subtask count_cust_shipped_data args.func(args) > {base_task_runner.py:101} > INFO - Job 989226: Subtask count_cust_shipped_data File > "/usr/local/lib/python2.7/site-packages/airflow/utils/cli.py", line 74, in > wrapper > {base_task_runner.py:101} > INFO - Job 989226: Subtask count_cust_shipped_data return f(*args, **kwargs) > {base_task_runner.py:101} > INFO - Job 989226: Subtask count_cust_shipped_data File > "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 526, in run > {base_task_runner.py:101} > INFO - Job 989226: Subtask count_cust_shipped_data _run(args, dag, ti) > {base_task_runner.py:101} > INFO - Job 989226: Subtask count_cust_shipped_data File > "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 445, in _run > {base_task_runner.py:101} > INFO - Job 989226: Subtask count_cust_shipped_data pool=args.pool, > {base_task_runner.py:101} > INFO - Job 989226: Subtask count_cust_shipped_data File > "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 73, in > wrapper > {base_task_runner.py:101} > INFO - Job 989226: Subtask count_cust_shipped_data return func(*args, > **kwargs) > {base_task_runner.py:101} > INFO - Job 989226: Subtask count_cust_shipped_data File > "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1692, in > _run_raw_task > {base_task_runner.py:101} > INFO - Job 989226: Subtask count_cust_shipped_data self.refresh_from_db() > {base_task_runner.py:101} > INFO - Job 989226: Subtask count_cust_shipped_data File > "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 73, in > wrapper > {base_task_runner.py:101} > INFO - Job 989226: Subtask count_cust_shipped_data return func(*args, > **kwargs) > {base_task_runner.py:101} > INFO - Job 989226: Subtask count_cust_shipped_data File > "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1218, in > refresh_from_db > {base_task_runner.py:101} > INFO - Job 989226: Subtask count_cust_shipped_data ti = qry.first() > -- This message was sent by Atlassian JIRA (v7.6.3#76005)