[
https://issues.apache.org/jira/browse/AIRFLOW-3877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831661#comment-16831661
]
James Coder edited comment on AIRFLOW-3877 at 5/2/19 4:30 PM:
--------------------------------------------------------------
Looking at jobs.py, it looks like it's line
[1310|https://github.com/apache/airflow/blob/3df044ff70190ff9ecf1584f1b8252ddd25a4307/airflow/jobs.py#L1310]
might be an issue. When generating the command that is passed to the worker it
uses full_filepath.
{code:java}
_enqueue_task_instances_with_queued_state(self, simple_dag_bag,
simple_task_instances):
//
file_path=simple_dag.full_filepath,{code}
which then generates a command like
{code:java}
['airflow', 'run', {DAG_ID}, {TASK_ID}, '-sd', '{FULL PATH TO DAG ON
SCHEDULER}']
{code}
When the worker picks it up it uses the path from -sd which is the path on the
scheduler host. If the DAGS_FOLDER on the worker host is different it blows up.
e.g. full_file_path = /usr/local/airflow/dags/example.dag
if your work runs out of /home/airflow/airflow/dags it throws an error.
it looks like there is an attempt to handle this in cli.py
[process_subdir|https://github.com/apache/airflow/blob/3df044ff70190ff9ecf1584f1b8252ddd25a4307/airflow/bin/cli.py#L130]
but the command passed form the scheduler doesn't have DAGS_FOLDER in the
path, it has an actual path so this does nothing.
{code:java}
def process_subdir(subdir):
if subdir:
subdir = subdir.replace('DAGS_FOLDER', DAGS_FOLDER)
subdir = os.path.abspath(os.path.expanduser(subdir))
return subdir
{code}
was (Author: jcoder):
Looking at jobs.py, it looks like it's line
[1310|https://github.com/apache/airflow/blob/3df044ff70190ff9ecf1584f1b8252ddd25a4307/airflow/jobs.py#L1310]
where
{code}
_enqueue_task_instances_with_queued_state(self, simple_dag_bag,
simple_task_instances):
//
file_path=simple_dag.full_filepath,{code}
which then generates a command like
{code:java}
['airflow', 'run', {DAG_ID}, {TASK_ID}, '-sd', '{FULL PATH TO DAG ON
SCHEDULER}']
{code}
When the worker picks it up it uses the path from -sd which is the path on the
scheduler host. If the DAGS_FOLDER on the worker host is different it blows up.
it looks like there is an attempt to handle this in cli.py
[process_subdir|https://github.com/apache/airflow/blob/3df044ff70190ff9ecf1584f1b8252ddd25a4307/airflow/bin/cli.py#L130]
but the command passed form the scheduler doesn't have DAGS_FOLDER in the
path, it has an actual path so this does nothing.
{code:java}
def process_subdir(subdir):
if subdir:
subdir = subdir.replace('DAGS_FOLDER', DAGS_FOLDER)
subdir = os.path.abspath(os.path.expanduser(subdir))
return subdir
{code}
> Scheduler sending the absolute path to celery
> ---------------------------------------------
>
> Key: AIRFLOW-3877
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3877
> Project: Apache Airflow
> Issue Type: Bug
> Components: scheduler
> Affects Versions: 1.10.1
> Reporter: Vipul Pandey
> Priority: Major
>
> Hi,
> Upgraded the airflow version from 1.7.3 to 1.10.1. After up-gradation of the
> scheduler, webserver and workers, the dags have stopped working showing below
> error on scheduler-
> {{Either the dag did not exist or it failed to parse.}}
> I have not made any changes to the config. While investigating the issue the
> scheduler logs shows the issue. Earlier the scheduler run the task as -
> Adding to queue: airflow run <dag_id> <task_id> <execution_date> --local -sd
> DAGS_FOLDER/<dag_filename.py>
> While now it is running with absolute path -
> Adding to queue: airflow run <dag_id> <task_id> <execution_date> --local -sd
> /<PATH_TO_DAGS_FOLDER>/<dag_filename.py>
> PATH_TO_DAGS_FOLDER is like /home/<user>/Airflow/dags...
> which is same as what it is pushing it to workers by since worker is running
> on some other user it is not able to find the dag location specified.
> I am using mysql as backend and rabbitmq for message passing.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)