well I did more testing today using guppy to measure memory consumption.
Also was watching processes and memory with htop while kicking off dags. My
test python object was defined like that:

payload = [1] * (2 * 10 ** 7) # 152 Mb

As Jeremiah said, the entire python code that generates dags is loaded FOR
every task instance. Actually from airflow logs, it looks like it is
executed twice for some reason during task instance run (i wonder why).

To make things worse, airflow webserver, runs 4 gunicorn processes (by
default) and every gunicorn process runs my dag generator file every 15
seconds or something like that, constantly loading that large structure to
RAM 4 times and offloading it and then starting over after 15 seconds.

My test 150Mb python object will blow up RAM utilization by at least 200
times if I kick all 200 dags at once assuming only one single task running
at time per dag.

So lesson learned - do not use large objects with Airflow!

This is code for test dag generator with a large python object:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator,
BranchPythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from pprint import pprint
import logging
import sys
from guppy import hpy
import time

def test_memory(payload, **kwargs):
    logging.info('test_memory: size of payload {}
Mb'.format(sys.getsizeof(payload) / 1024 / 1024))
    logging.info(hpy().heap())
    time.sleep(15)

def get_dag(i):
    """
    Returns generated DAG object with tasks
    """

    logging.info('get_dag({}): size of payload {} Mb'.format(i,
sys.getsizeof(payload) / 1024 / 1024))
    logging.info(hpy().heap())

    dag_schedule_interval = None
    dag_start_date = datetime.now()

    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': dag_start_date,
        'email': ['airf...@airflow.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'catchup': False
    }

    # Change version number if schedule needs to be changed:
    dag = DAG(
        'memory_tester_{}'.format(i),
        default_args=default_args,
        schedule_interval=dag_schedule_interval,
        max_active_runs=1
    )
    # max_active_runs set to 1 which means only one active DAG instance at
the time. Works only with scheduler, it is
    # not preventing someone to run a DAG manually concurrently

    t1 = PythonOperator(
        task_id="t1",
        python_callable=test_memory,
        provide_context=True,
        op_kwargs={'payload': payload
                   },
        dag=dag
    )

    t2 = PythonOperator(
        task_id="t2",
        python_callable=test_memory,
        provide_context=True,
        op_kwargs={'payload': payload
                   },
        dag=dag
    )

    t2.set_upstream(t1)

    logging.info('end of get_dag({})'.format(i))
    logging.info(hpy().heap())

    return dag

# payload = [1,2,3] * (2 * 10 ** 7) # 457 Mb
payload = [1] * (2 * 10 ** 7) # 152 Mb

for i in range(3):
    dag = get_dag(i+1)
    globals()[dag.dag_id] = dag


On Wed, Mar 22, 2017 at 3:50 PM, Boris Tyukin <bo...@boristyukin.com> wrote:

> thanks Jeremiah, this is exactly what was bugging me. I am going to
> rewrite that code and look at persistent storage. your explanation helped,
> thanks!
>
> On Wed, Mar 22, 2017 at 2:29 PM, Jeremiah Lowin <jlo...@apache.org> wrote:
>
>> In vanilla Python, your DAGs will all reference the same object, so when
>> your DAG file is parsed and 200 DAGs are created, there will still only be
>> 1 60MB dict object created (I say vanilla because there are obviously ways
>> to create copies of the object).
>>
>> HOWEVER, you should assume that each Airflow TASK is being run in a
>> different process, and each process is going to load your DAG file when it
>> runs. If resource use is a concern, I suggest you look at out-of-core or
>> persistent storage for the object so you don't need to load the whole
>> thing
>> every time.
>>
>> On Wed, Mar 22, 2017 at 11:20 AM Boris Tyukin <bo...@boristyukin.com>
>> wrote:
>>
>> > hi Jeremiah, thanks for the explanation!
>> >
>> > i am very new to Python so was surprised that it works and my external
>> > dictionary object was still accessible to all dags generated. I think it
>> > makes sense but I would like to confirm one thing and I do not know how
>> to
>> > test it myself.
>> >
>> > do you think that large dictionary object will still be loaded to memory
>> > only once even if I generate 200 dags that will be accessing it? so
>> > basically they will just use a reference to it or they would create a
>> copy
>> > of the same 60Mb structure.
>> >
>> > I hope my question makes sense :)
>> >
>> > On Wed, Mar 22, 2017 at 10:54 AM, Jeremiah Lowin <jlo...@apache.org>
>> > wrote:
>> >
>> > > At the risk of oversimplifying things, your DAG definition file is
>> loaded
>> > > *every* time a DAG (or any task in that DAG) is run. Think of it as a
>> > > literal Python import of your dag-defining module: any variables are
>> > loaded
>> > > along with the DAGs, which are then executed. That's why your dict is
>> > > always available. This will work with Celery since it follows the same
>> > > approach, parsing your DAG file to run each task.
>> > >
>> > > (By the way, this is why it's critical that all parts of your Airflow
>> > > infrastructure have access to the same DAGS_FOLDER)
>> > >
>> > > Now it is true that the DagBag loads DAG objects but think of it as
>> more
>> > of
>> > > an "index" so that the scheduler/webserver know what DAGs are
>> available.
>> > > When it's time to actually run one of those DAGs, the executor loads
>> it
>> > > from the underlying source file.
>> > >
>> > > Jeremiah
>> > >
>> > > On Wed, Mar 22, 2017 at 8:45 AM Boris Tyukin <bo...@boristyukin.com>
>> > > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > I have a weird question but it bugs my mind. I have some like below
>> to
>> > > > generate dags dynamically, using Max's example code from FAQ.
>> > > >
>> > > > It works fine but I have one large dict (let's call it
>> my_outer_dict)
>> > > that
>> > > > takes over 60Mb in memory and I need to access it from all generated
>> > > dags.
>> > > > Needless to say, i do not want to recreate that dict for every dag
>> as I
>> > > > want to load it to memory only once.
>> > > >
>> > > > To my surprise, if i define that dag outside of my dag definition
>> > code, I
>> > > > can still access it.
>> > > >
>> > > > Can someone explain why and where is it stored? I thought only dag
>> > > > definitions are loaded to dagbag and not the variables outside it.
>> > > >
>> > > > Is it even a good practice and will it work still if I switch to
>> celery
>> > > > executor?
>> > > >
>> > > >
>> > > > def get_dag(i):
>> > > >     dag_id = 'foo_{}'.format(i)
>> > > > dag = DAG(dag_id)
>> > > > ....
>> > > > print my_outer_dict
>> > > >
>> > > > my_outer_dict = {}
>> > > > for i in range(10):
>> > > > dag = get_dag(i)
>> > > >     globals()[dag.dag_id] = dag
>> > > >
>> > >
>> >
>>
>
>

Reply via email to