[ 
https://issues.apache.org/jira/browse/AIRFLOW-4527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862884#comment-16862884
 ] 

Bharath Palaksha commented on AIRFLOW-4527:
-------------------------------------------

This defect was found on 1.10.2 and it should be there on latest version too
To reproduce this defect, we can hardcode DB connection error.

Below is the code where i have done that to reproduce

Code version - 1.10.2

File - models.py

method -  "_run_raw_task"

hard coded exception in bold

 

*def _run_raw_task(*
 self,
 mark_success=False,
 test_mode=False,
 job_id=None,
 pool=None,
 session=None):
 """
 Immediately runs the task (without checking or changing db state
 before execution) and then sets the appropriate final state after
 completion and runs any post-execute callbacks. Meant to be called
 only after another function changes the state to running.

 :param mark_success: Don't run the task, mark its state as success
 :type mark_success: boolean
 :param test_mode: Doesn't record success or failure in the DB
 :type test_mode: boolean
 :param pool: specifies the pool to use to run the task instance
 :type pool: str
 """
 task = self.task
 self.pool = pool or task.pool
 self.test_mode = test_mode
 self.refresh_from_db(session=session)
 self.job_id = job_id
 self.hostname = get_hostname()
 self.operator = task.__class__.__name__

 context = {}
 try:
    if not mark_success:
    context = self.get_template_context()

    task_copy = copy.copy(task) . 
    self.task = task_copy

    def signal_handler(signum, frame):
        self.log.error("Received SIGTERM. Terminating subprocesses.")
        task_copy.on_kill()
        raise AirflowException("Task received SIGTERM signal")
    signal.signal(signal.SIGTERM, signal_handler)

    # Don't clear Xcom until the task is certain to execute
    self.clear_xcom_data()

    self.render_templates()
    task_copy.pre_execute(context=context)

    # If a timeout is specified for the task, make it fail
    # if it goes beyond
    result = None
    if task_copy.execution_timeout:
    try:
        with timeout(int(
        task_copy.execution_timeout.total_seconds())):
        result = task_copy.execute(context=context)
        except AirflowTaskTimeout:
        task_copy.on_kill()
        raise
    else:
    result = task_copy.execute(context=context)

    # If the task returns a result, push an XCom containing it
    if result is not None:
    self.xcom_push(key=XCOM_RETURN_KEY, value=result)

    # TODO remove deprecated behavior in Airflow 2.0
    try:
        task_copy.post_execute(context=context, result=result)
        except TypeError as e:
        if 'unexpected keyword argument' in str(e):
        warnings.warn(
        'BaseOperator.post_execute() now takes two '
        'arguments, `context` and `result`, but "{}" only '
        'expected one. This behavior is deprecated and '
        'will be removed in a future version of '
        'Airflow.'.format(self.task_id),
        category=DeprecationWarning)
        task_copy.post_execute(context=context)
        else:
       raise

      Stats.incr('operator_successes_{}'.format(
      self.task.__class__.__name__), 1, 1)
      Stats.incr('ti_successes')
      self.refresh_from_db(lock_for_update=True)
      self.state = State.SUCCESS

      *raise OperationalError -  # hard coded exception, it can be here when 
task is success - Line no 1683*
  except AirflowSkipException:
      self.refresh_from_db(lock_for_update=True)
      self.state = State.SKIPPED
  except AirflowRescheduleException as reschedule_exception:
      self.refresh_from_db()
  self._handle_reschedule(reschedule_exception, test_mode, context)
  return
  except AirflowException as e:
     self.refresh_from_db()
      *raise OperationalError  -  # hard coded exception, it can be here when 
task is a failure - Line no 1693*
  # for case when task is marked as success/failed externally
  # current behavior doesn't hit the success callback
  if self.state in \{State.SUCCESS, State.FAILED}:
      return
  else:
  self.handle_failure(e, test_mode, context)
    raise
   except (Exception, KeyboardInterrupt) as e:
   self.handle_failure(e, test_mode, context)
     raise

 # Success callback
 try:
 if task.on_success_callback:
 task.on_success_callback(context)
 except Exception as e3:
 self.log.error("Failed when executing success callback")
 self.log.exception(e3)

 # Recording SUCCESS
 self.end_date = timezone.utcnow()
 self.set_duration()
 if not test_mode:
 session.add(Log(self.state, self))
 session.merge(self)
 session.commit()

 

 

> Connection error while calling refreshfromdb() makes the task stuck in 
> running state
> ------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-4527
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4527
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: database
>            Reporter: Bharath Palaksha
>            Assignee: Bharath Palaksha
>            Priority: Major
>              Labels: mysql
>             Fix For: 1.10.2
>
>
> {{I have setup airflow with mysql as metastore. When there is a network issue 
> and task fails with a network connection reset exception, airflow tries to 
> refresh status from db and gets a connection error - This results in task 
> getting stuck in running.}}
> {{There is no retry for mysql connection error and it never handles the 
> exception}}
> If worker nodes are unable to reach mysql to update task status, scheduler 
> node should handle this scenario and mark those tasks failed. Tasks shouldn't 
> be stuck in running state for ever.
>  
>  Scheduler heartbeat got an exception: (MySQLdb._exceptions.OperationalError) 
> (2013, "Lost connection to MySQL server at 'reading authorization packet', 
> system error: 104") (Background on this error at: [http://sqlalche.me/e/e3q8])
> {base_task_runner.py:101}
> INFO - Job 989226: Subtask count_cust_shipped_data Traceback (most recent 
> call last):
> {base_task_runner.py:101}
> INFO - Job 989226: Subtask count_cust_shipped_data File 
> "/usr/local/bin/airflow", line 32, in <module>
> {base_task_runner.py:101}
> INFO - Job 989226: Subtask count_cust_shipped_data args.func(args)
> {base_task_runner.py:101}
> INFO - Job 989226: Subtask count_cust_shipped_data File 
> "/usr/local/lib/python2.7/site-packages/airflow/utils/cli.py", line 74, in 
> wrapper
> {base_task_runner.py:101}
> INFO - Job 989226: Subtask count_cust_shipped_data return f(*args, **kwargs)
> {base_task_runner.py:101}
> INFO - Job 989226: Subtask count_cust_shipped_data File 
> "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 526, in run
> {base_task_runner.py:101}
> INFO - Job 989226: Subtask count_cust_shipped_data _run(args, dag, ti)
> {base_task_runner.py:101}
> INFO - Job 989226: Subtask count_cust_shipped_data File 
> "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 445, in _run
> {base_task_runner.py:101}
> INFO - Job 989226: Subtask count_cust_shipped_data pool=args.pool,
> {base_task_runner.py:101}
> INFO - Job 989226: Subtask count_cust_shipped_data File 
> "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 73, in 
> wrapper
> {base_task_runner.py:101}
> INFO - Job 989226: Subtask count_cust_shipped_data return func(*args, 
> **kwargs)
> {base_task_runner.py:101}
> INFO - Job 989226: Subtask count_cust_shipped_data File 
> "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1692, in 
> _run_raw_task
> {base_task_runner.py:101}
> INFO - Job 989226: Subtask count_cust_shipped_data self.refresh_from_db()
> {base_task_runner.py:101}
> INFO - Job 989226: Subtask count_cust_shipped_data File 
> "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 73, in 
> wrapper
> {base_task_runner.py:101}
> INFO - Job 989226: Subtask count_cust_shipped_data return func(*args, 
> **kwargs)
> {base_task_runner.py:101}
> INFO - Job 989226: Subtask count_cust_shipped_data File 
> "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1218, in 
> refresh_from_db
> {base_task_runner.py:101}
> INFO - Job 989226: Subtask count_cust_shipped_data ti = qry.first()
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to