Hello everyone, We are using airflow for scheduling our batch processing jobs and it has been a great tool for scheduling our DAGs.
Recently we are trying to package our dags along with dependencies into a zip. This zip is then uploaded to GCS and then we sync these dags from GCS and mount it into a PersistentVolumeClaim(PVC) on kubernetes. Since these airflow dags have to be synced from GCS to airflow, we have followed few strategies, but nothing has worked reliably. We are currently running *airflow version 1.10.2*. Our current setup is as follows: As this involves multiple components, I have split it down in steps. *Step 1*. Syncing dags using GCSFuse We bundle the dependencies along with dags and upload it to GCS. Gcsfuse(https://github.com/GoogleCloudPlatform/gcsfuse) is used inorder to sync the dags into the dags_folder property of airflow. We have created a airflow user with UID and GID of 1000, and the dags folder has been created with the airflow user. We run the following command to sync the airflow dags: gcsfuse --uid=1000 --gid=1000 --key-file my-secret.json /home/airflow/airflow/dags *Step 2*. Mounting the dags to PVC This dags folder is not able to mount on our PVC maybe because the folder has been created by airflow user and it expects it to be root user. So inorder to mount the folder to PVC, we periodically(period of 1 minute) copy the contents of the dags folder to dagsmount folder(using rsync command). Command is: rsync -r --delete dags/ dagsmount/ The dagsmount folder is created with root user, so it is able to sync the dagsmount folder to PVC and not the dags folder. In this setup, we are facing issues of intermittently getting either "dag_id not found" or "zip file doesnt exist" errors. Our current assumption as to why we are getting these errors is that since we are continuously syncing the airflow dags, there are reads and writes happening in parallel which is causing the errors. Here is the sample stack trace we get for these errors: [2019-08-10 12:00:36,754] {models.py:412} ERROR - Failed to import: /home/airflow/airflow/dags/packaged_dags.zip Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models.py", line 409, in process_file m = importlib.import_module(mod_name) File "/usr/local/lib/python3.7/importlib/__init__.py", line 127, in import_module return _bootstrap._gcd_import(name[level:], package, level) File "<frozen importlib._bootstrap>", line 1006, in _gcd_import File "<frozen importlib._bootstrap>", line 983, in _find_and_load File "<frozen importlib._bootstrap>", line 963, in _find_and_load_unlocked File "<frozen importlib._bootstrap>", line 906, in _find_spec File "<frozen importlib._bootstrap_external>", line 1280, in find_spec File "<frozen importlib._bootstrap_external>", line 1254, in _get_spec File "<frozen importlib._bootstrap_external>", line 1235, in _legacy_get_spec File "<frozen importlib._bootstrap>", line 441, in spec_from_loader File "<frozen importlib._bootstrap_external>", line 594, in spec_from_file_location FileNotFoundError: [Errno 2] No such file or directory: '/home/airflow/airflow/dags/packaged_dags.zip' DEBUG - Calling callbacks: [] INFO - Job 3063: Subtask export_bigquery_wallet_table /usr/local/lib/python3.7/site-packages/airflow/utils/helpers.py:356: DeprecationWarning: Importing 'BashOperator' directly from 'airflow.operators' has been deprecated. Please import from 'airflow.operators.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0. INFO - Job 3063: Subtask export_bigquery_wallet_table DeprecationWarning) INFO - Job 3063: Subtask export_bigquery_wallet_table /usr/local/lib/python3.7/site-packages/airflow/contrib/operators/bigquery_operator.py:172: DeprecationWarning: Deprecated parameter `bql` used in Task id: export_bigquery_brand_table. Use `sql` parameter instead to pass the sql to be executed. `bql` parameter is deprecated and will be removed in a future version of Airflow. INFO - Job 3063: Subtask export_bigquery_wallet_table category=DeprecationWarning) INFO - Job 3063: Subtask export_bigquery_wallet_table /usr/local/lib/python3.7/site-packages/airflow/contrib/operators/bigquery_operator.py:172: DeprecationWarning: Deprecated parameter `bql` used in Task id: export_bigquery_brand_peak_order_time_table. Use `sql` parameter instead to pass the sql to be executed. `bql` parameter is deprecated and will be removed in a future version of Airflow. INFO - Job 3063: Subtask export_bigquery_wallet_table category=DeprecationWarning) INFO - Job 3063: Subtask export_bigquery_wallet_table [2019-08-10 12:00:36,760] {settings.py:201} DEBUG - Disposing DB connection pool (PID 37) INFO - Job 3063: Subtask export_bigquery_wallet_table Traceback (most recent call last): INFO - Job 3063: Subtask export_bigquery_wallet_table File "/usr/local/bin/airflow", line 32, in <module> INFO - Job 3063: Subtask export_bigquery_wallet_table args.func(args) INFO - Job 3063: Subtask export_bigquery_wallet_table File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper INFO - Job 3063: Subtask export_bigquery_wallet_table return f(*args, **kwargs) INFO - Job 3063: Subtask export_bigquery_wallet_table File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 504, in run INFO - Job 3063: Subtask export_bigquery_wallet_table dag = get_dag(args) INFO - Job 3063: Subtask export_bigquery_wallet_table File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 149, in get_dag INFO - Job 3063: Subtask export_bigquery_wallet_table 'parse.'.format(args.dag_id)) INFO - Job 3063: Subtask export_bigquery_wallet_table airflow.exceptions.AirflowException: dag_id could not be found: customer_summary_integration.customer_profile. Either the dag did not exist or it failed to parse. I have tried to cover all details, let me know if anything is unclear. -- Thanks and Regards, Maulik Soneji