BobasB opened a new issue #9722:
URL: https://github.com/apache/airflow/issues/9722


   Hi, I have a very strange and specific behaviour of Airflow on AWS EKS 
cluster after deploying Calico to enforce network policies.  I have also 
created AWS support case, but I also need support from Airflow team. I will be 
very appreciated for any help.
   **What happened**:
   I have Airflow set-up running as 2 k8s pods (Airflow webserver and 
scheduler). Both Airflow pods use git-sync sidecar container to get DAGs from 
git and store it at k8s `emptyDir` volume. All works well on fresh EKS cluster 
without errors. But at the moment of deploing Calico 
https://docs.aws.amazon.com/eks/latest/userguide/calico.html to EKS cluster all 
DAGs with local imports become broken. Airflow has default k8s Network policy 
which allow all ingress/egress traffic without restrictions, and Airflow UI is 
accessible. But in the Airflow there is a message `DAG "helloWorld" seems to be 
missing.` and Airflow webserver became to generate an error in the logs: 
   ```
   [2020-07-08 14:43:38,784] {__init__.py:51} INFO - Using executor 
SequentialExecutor                                                              
            │
   │ [2020-07-08 14:43:38,784] {dagbag.py:396} INFO - Filling up the DagBag 
from /usr/local/airflow/dags/repo                                               
      │
   │ [2020-07-08 14:43:38,785] {dagbag.py:225} DEBUG - Importing 
/usr/local/airflow/dags/repo/airflow_dags/dag_test.py                           
                 │
   │ [2020-07-08 14:43:39,016] {dagbag.py:239} ERROR - Failed to import: 
/usr/local/airflow/dags/repo/airflow_dags/dag_test.py                           
         │
   │ Traceback (most recent call last):                                         
                                                                                
  │
   │   File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py", 
line 236, in process_file                                                       
   │
   │     m = imp.load_source(mod_name, filepath)                                
                                                                                
  │
   │   File "/usr/local/lib/python3.7/imp.py", line 171, in load_source         
                                                                                
  │
   │     module = _load(spec)                                                   
                                                                                
  │
   │   File "<frozen importlib._bootstrap>", line 696, in _load                 
                                                                                
  │
   │   File "<frozen importlib._bootstrap>", line 677, in _load_unlocked        
                                                                                
  │
   │   File "<frozen importlib._bootstrap_external>", line 728, in exec_module  
                                                                                
  │
   │   File "<frozen importlib._bootstrap>", line 219, in 
_call_with_frames_removed                                                       
                        │
   │   File "/usr/local/airflow/dags/repo/airflow_dags/dag_test.py", line 5, in 
<module>                                                                        
  │
   │     from airflow_dags.common import DEFAULT_ARGS                           
                                                                                
  │
   │ ModuleNotFoundError: No module named 'airflow_dags'
   ```
   
   The DAG itself consists of 2 files: `dag_test.py` and `common.py`. Content 
of the files are:
   `common.py`
   ```
   from datetime import datetime, timedelta
   
   DEFAULT_ARGS = {
       'owner': 'airflow',
       'depends_on_past': False,
       'start_date': datetime(2020, 3, 26),
       'retry_delay': timedelta(minutes=1),
   }
   ```
   
   `dag_test.py` 
   ```
   from airflow import DAG
   from airflow.operators.bash_operator import BashOperator
   
   from airflow_dags.common import DEFAULT_ARGS
   
   dag = DAG('helloWorld', schedule_interval='*/5 * * * *', 
default_args=DEFAULT_ARGS)
   
   t1 = BashOperator(
       task_id='task_1',
       bash_command='echo "Hello World from Task 1"; sleep 30',
       dag=dag
   )
   ```
   
   *What I have already tried at the webserver and scheduler pods*:
   - ssh to Airflow pod and enter Python shell. All imports work fine, for 
example:
   ```
   airflow@airflow-webserver-78bc695cc7-l7z9s:~$ pwd
   /usr/local/airflow
   airflow@airflow-webserver-78bc695cc7-l7z9s:~$ python
   Python 3.7.4 (default, Oct 17 2019, 06:10:02)
   [GCC 8.3.0] on linux
   Type "help", "copyright", "credits" or "license" for more information.
   >>> from airflow_dags.common import DEFAULT_ARGS
   >>> print(DEFAULT_ARGS)
   {'owner': 'airflow', 'depends_on_past': False, 'start_date': 
datetime.datetime(2020, 3, 26, 0, 0), 'retry_delay': 
datetime.timedelta(seconds=60)}
   >>>
   ```
   - from pod bash shell, I can execute airflow command and `list_tasks`, and 
DAG is not broken:
   ```
   airflow@airflow-webserver-78bc695cc7-l7z9s:~$ airflow list_tasks helloWorld
   [2020-07-08 15:37:24,309] {settings.py:212} DEBUG - Setting up DB connection 
pool (PID 275)
   [2020-07-08 15:37:24,310] {settings.py:253} DEBUG - 
settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, 
pool_recycle=1800, pid=275
   [2020-07-08 15:37:24,366] {cli_action_loggers.py:42} DEBUG - Adding 
<function default_action_log at 0x7fb9b5a4f710> to pre execution callback
   [2020-07-08 15:37:24,817] {cli_action_loggers.py:68} DEBUG - Calling 
callbacks: [<function default_action_log at 0x7fb9b5a4f710>]
   [2020-07-08 15:37:24,847] {__init__.py:51} INFO - Using executor 
SequentialExecutor
   [2020-07-08 15:37:24,848] {dagbag.py:396} INFO - Filling up the DagBag from 
/usr/local/airflow/dags/repo
   [2020-07-08 15:37:24,849] {dagbag.py:225} DEBUG - Importing 
/usr/local/airflow/dags/repo/airflow_dags/dag_test.py
   [2020-07-08 15:37:25,081] {dagbag.py:363} DEBUG - Loaded DAG <DAG: 
helloWorld>
   [2020-07-08 15:37:25,082] {dagbag.py:225} DEBUG - Importing 
/usr/local/airflow/dags/repo/airflow_dags/dagbg_add.py
   task_1
   [2020-07-08 15:37:25,083] {cli_action_loggers.py:86} DEBUG - Calling 
callbacks: []
   [2020-07-08 15:37:25,083] {settings.py:278} DEBUG - Disposing DB connection 
pool (PID 275)
   
   airflow@airflow-webserver-78bc695cc7-l7z9s:~$ airflow trigger_dag helloWorld
   [2020-07-08 15:50:25,446] {settings.py:212} DEBUG - Setting up DB connection 
pool (PID 717)
   [2020-07-08 15:50:25,446] {settings.py:253} DEBUG - 
settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, 
pool_recycle=1800, pid=717
   [2020-07-08 15:50:25,502] {cli_action_loggers.py:42} DEBUG - Adding 
<function default_action_log at 0x7fe05c254710> to pre execution callback
   [2020-07-08 15:50:25,986] {cli_action_loggers.py:68} DEBUG - Calling 
callbacks: [<function default_action_log at 0x7fe05c254710>]
   [2020-07-08 15:50:26,024] {__init__.py:51} INFO - Using executor 
SequentialExecutor
   [2020-07-08 15:50:26,024] {dagbag.py:396} INFO - Filling up the DagBag from 
/usr/local/airflow/dags/repo/airflow_dags/dag_test.py
   [2020-07-08 15:50:26,024] {dagbag.py:225} DEBUG - Importing 
/usr/local/airflow/dags/repo/airflow_dags/dag_test.py
   [2020-07-08 15:50:26,253] {dagbag.py:363} DEBUG - Loaded DAG <DAG: 
helloWorld>
   Created <DagRun helloWorld @ 2020-07-08 15:50:26+00:00: 
manual__2020-07-08T15:50:26+00:00, externally triggered: True>
   [2020-07-08 15:50:26,289] {cli_action_loggers.py:86} DEBUG - Calling 
callbacks: []
   [2020-07-08 15:50:26,289] {settings.py:278} DEBUG - Disposing DB connection 
pool (PID 717)
   ```
   
   *To summarise*: Airflow DAGs which has local imports become broken in UI and 
in webserver logs, but is executable from a manual trigger when using EKS 
cluster with Calico network policies.
   
   Please help me to understand why Airflow DAGs imports become broken in UI.
   
   **Apache Airflow version**: 1.10.10
   **Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
   ```
   Server Version: version.Info{Major:"1", Minor:"15+", 
GitVersion:"v1.15.11-eks-af3caf", 
GitCommit:"af3caf6136cd355f467083651cc1010a499f59b1", GitTreeState:"clean", 
BuildDate:"2020-03-27T21:51:36Z", GoVersion:"go1.12.17", Compiler:"gc", 
Platform:"linux/amd64"}
   ```
   **Environment**:
   - **Cloud provider or hardware configuration**: AWS, EKS
   - **OS** (e.g. from /etc/os-release):
   EKS workers nodes, EC2 instances:
   ```
   NAME="Amazon Linux"
   VERSION="2"
   ID="amzn"
   ID_LIKE="centos rhel fedora"
   VERSION_ID="2"
   PRETTY_NAME="Amazon Linux 2"
   ANSI_COLOR="0;33"
   CPE_NAME="cpe:2.3:o:amazon:amazon_linux:2"
   HOME_URL="https://amazonlinux.com/";
   ```
   Docker image with installed Airflow:
   ```
   PRETTY_NAME="Debian GNU/Linux 10 (buster)"
   NAME="Debian GNU/Linux"
   VERSION_ID="10"
   VERSION="10 (buster)"
   VERSION_CODENAME=buster
   ID=debian
   HOME_URL="https://www.debian.org/";
   SUPPORT_URL="https://www.debian.org/support";
   BUG_REPORT_URL="https://bugs.debian.org/";
   ```
   - **Kernel** (e.g. `uname -a`):
   ```
   Linux airflow-webserver-78bc695cc7-dmzh2 4.14.181-140.257.amzn2.x86_64 #1 
SMP Wed May 27 02:17:36 UTC 2020 x86_64 GNU/Linux
   ```
   - **Install tools**: we use `pipenv` to install Airflow to system `pipenv 
install --system --deploy --clear`
   - **Others**:
   
   **How to reproduce it**:
   Create EKS cluster and deploy Calico. Use DAG with local imports.
   
   **Anything else we need to know**:
   I have all required env, such as `AIRFLOW_HOME=/usr/local/airflow, 
AIRFLOW_DAGS_FOLDER=/usr/local/airflow/dags/repo, 
PYTHONPATH=/usr/local/airflow/dags/repo`  and on EKS cluster without network 
policies all works fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to