Uwe Jentsch created AIRFLOW-6486:
------------------------------------

             Summary: SubdagOperator fails when manually set to success 
                 Key: AIRFLOW-6486
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6486
             Project: Apache Airflow
          Issue Type: Bug
          Components: DAG
    Affects Versions: 1.10.7
            Reporter: Uwe Jentsch
         Attachments: dag_subdag_2020-01-05T04_00_00+00_00_1.log, 
dag_subdag_2020-01-05T04_00_00+00_00_2.log, mvp_dag.py

Hello,

sometimes we want to set specific operators of our DAG to success but after the 
upgrade from version 1.10.1 to 1.10.7 we got the problem that subdag operators 
are failed after we set them to success. All internal operators are 
successfully set to the success state.

Here are the two exeptions we get. I'll attach a full log of both to this 
ticket.

 
{code:java}
[2020-01-06 11:46:09,706] {taskinstance.py:1088} ERROR - [Errno 2] No such file 
or directory
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs/backfill_job.py", 
line 788, in _execute
    session=session)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 70, 
in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs/backfill_job.py", 
line 718, in _execute_for_run_dates
    session=session)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 70, 
in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs/backfill_job.py", 
line 592, in _process_backfill_task_instances
    self.heartbeat()
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 
188, in heartbeat
    sleep(sleep_for)
  File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", 
line 932, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib64/python3.6/multiprocessing/managers.py", line 749, in 
_callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", 
line 955, in _run_raw_task
    result = task_copy.execute(context=context)
  File 
"/usr/local/lib/python3.6/site-packages/airflow/operators/subdag_operator.py", 
line 102, in execute
    executor=self.executor)
  File "/usr/local/lib/python3.6/site-packages/airflow/models/dag.py", line 
1290, in run
    job.run()
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 
221, in run
    self._execute()
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, 
in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs/backfill_job.py", 
line 812, in _execute
    executor.end()
  File 
"/usr/local/lib/python3.6/site-packages/airflow/executors/local_executor.py", 
line 234, in end
    self.impl.end()
  File 
"/usr/local/lib/python3.6/site-packages/airflow/executors/local_executor.py", 
line 210, in end
    self.queue.put((None, None))
  File "<string>", line 2, in put
  File "/usr/lib64/python3.6/multiprocessing/managers.py", line 753, in 
_callmethod
    self._connect()
  File "/usr/lib64/python3.6/multiprocessing/managers.py", line 740, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/lib64/python3.6/multiprocessing/connection.py", line 487, in Client
    c = SocketClient(address)
  File "/usr/lib64/python3.6/multiprocessing/connection.py", line 614, in 
SocketClient
    s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory
[2020-01-06 11:46:09,775] {taskinstance.py:1119} INFO - Marking task as FAILED.
{code}
{code:java}
[2020-01-06 12:07:57,269] {taskinstance.py:1088} ERROR - 
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs/backfill_job.py", 
line 788, in _execute
    session=session)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 70, 
in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs/backfill_job.py", 
line 718, in _execute_for_run_dates
    session=session)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 70, 
in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs/backfill_job.py", 
line 592, in _process_backfill_task_instances
    self.heartbeat()
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 
188, in heartbeat
    sleep(sleep_for)
  File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", 
line 932, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", 
line 955, in _run_raw_task
    result = task_copy.execute(context=context)
  File 
"/usr/local/lib/python3.6/site-packages/airflow/operators/subdag_operator.py", 
line 102, in execute
    executor=self.executor)
  File "/usr/local/lib/python3.6/site-packages/airflow/models/dag.py", line 
1290, in run
    job.run()
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 
221, in run
    self._execute()
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, 
in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs/backfill_job.py", 
line 812, in _execute
    executor.end()
  File 
"/usr/local/lib/python3.6/site-packages/airflow/executors/local_executor.py", 
line 234, in end
    self.impl.end()
  File 
"/usr/local/lib/python3.6/site-packages/airflow/executors/local_executor.py", 
line 213, in end
    self.queue.join()
  File "<string>", line 2, in join
  File "/usr/lib64/python3.6/multiprocessing/managers.py", line 757, in 
_callmethod
    kind, result = conn.recv()
  File "/usr/lib64/python3.6/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/lib64/python3.6/multiprocessing/connection.py", line 407, in 
_recv_bytes
    buf = self._recv(4)
  File "/usr/lib64/python3.6/multiprocessing/connection.py", line 383, in _recv
    raise EOFError
EOFError
[2020-01-06 12:07:57,272] {taskinstance.py:1119} INFO - Marking task as FAILED.
{code}
 

 

If we zoom into the Subdag and set everything to success, the subdag operator 
itself will also be in the success state. But if we want to set an operator and 
its downstream to success this will often set multiple subdag operators to 
success that will then fail.

 

We are using LocalExecutor for the DAG as well as for the Subdag operator. I'll 
add a minimal example I could use to reproduce the issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to