[ 
https://issues.apache.org/jira/browse/AIRFLOW-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yingbo Wang updated AIRFLOW-3964:
---------------------------------
    Affects Version/s: 1.10.0
          Description: 
h2. Problem
h3. Airflow Sensor:

Sensors are a certain type of operator that will keep running until a certain 
criterion is met. Examples include a specific file landing in HDFS or S3, a 
partition appearing in Hive, or a specific time of the day. Sensors are derived 
from BaseSensorOperator and run a poke method at a specified poke_interval 
until it returns True.

Airflow Sensor duplication is a normal problem for large scale airflow project. 
There are duplicated partitions needing to be detected from same/different DAG. 
In Airbnb there are 88 boxes running four different types of sensors everyday. 
The number of running sensor tasks ranges from 8k to 16k, which takes great 
amount of resources. Although Airflow team had redirected all sensors to a 
specific queue to allocate relatively minor resource, there is still large room 
to reduce the number of workers and relief DB pressure by optimizing the sensor 
mechanism.

Existing sensor implementation creates an identical task for any sensor task 
with specific dag_id, task_id and execution_date. This task is responsible of 
keeping querying DB until the specified partitions exists. Even if two tasks 
are waiting for same partition in DB, they are creating two connections with 
the DB and checking the status in two separate processes. In one hand, DB need 
to run duplicate jobs in multiple processes which will take both cpu and memory 
resources. At the same time, Airflow need to maintain a process for each sensor 
to query and wait for the partition/table to be created.
h1. ***Design*

There are several issues need to be resolved for our smart sensor. 
h2. Persist sensor infor in DB and avoid file parsing before running

Current Airflow implementation need to parse the DAG python file before running 
a task. Parsing multiple python file in a smart sensor make the case low 
efficiency and overload. Since sensor tasks need relatively more “light weight” 
executing information -- less number of properties with simple structure (most 
are built in type instead of function or object). We propose to skip the 
parsing for smart sensor. The easiest way is to persist all task instance 
information in airflow metaDB. 
h3. Solution:

It will be hard to dump the whole task instance object dictionary. And we do 
not really need that much information. 

We add two sets to the base sensor class as “persist_fields” and 
“execute_fields”. 
h4. “persist_fields”  dump to airflow.task_instance column “attr_dict”

saves the attribute names that should be used to accomplish a sensor poking 
job. For example:
 #  the “NamedHivePartitionSensor” define its persist_fields as  
('partition_names', 'metastore_conn_id', 'hook') since these properties are 
enough for its poking function. 
 # While the HivePartitionSensor can be slightly different use persist_fields 
as ('schema', 'table', 'partition', 'metastore_conn_id')

If we have two tasks that have same property value for all field in 
persist_fields. That means these two tasks are poking the same item and they 
are holding duplicate poking jobs in senser. 

*The persist_fields can help us in deduplicate sensor tasks*. In a more broader 
way. If we can list persist_fields for all operators, it can help to dedup all 
airflow tasks.
h4. “Execute_fields” dump to airflow.task_instance column “exec_dict”

Saves the execution configuration such as “poke_interval”, “timeout”, 
“execution_timeout”

Fields in this set do not contain information affecting the poking job detail. 
They are related to how frequent should we poke, when should the task timeout, 
how many times timeout should be a fail etc. We only put those logic that we 
can easily handle in a smart sensor for now. This is a smart sensor “doable 
whitelist” and can be extended with more logic being “unlocked” by smart sensor 
implementation. 

 When we initialize a task instance object. We dump the attribute value of 
these two sets and persist them in the Airflow metaDB. Smart sensor can visit 
DB to get all required information of running sensor tasks and don’t need to 
parse any DAG files.
h2. Airflow scheduler change

We do not want to break any existing logic in scheduler. The smart sensor is a 
configurable mode and can be easily fallback to scheduler regular logic when it 
detects the case is not good for smart sensor.
h3. Solution
h4. Scheduler process_file

Right before we set a task instance state to “scheduled”, add smart sensor 
check to do:
 # Check if Airflow is configured as use smart sensor
 # Check if current task is good for smart sensor running

If both check got a “yes” that means the task instance is qualified for smart 
sensor. Airflow scheduler set its state to “smart_pending” instead of 
“scheduled” and this task instance will  *NOT BE QUEUED* to the executor. It is 
expected to be picked up by a smart sensor task from DB query. Smart sensor 
will update the state to final state (“success” or “failed”) or “up_for_retry” 
and it can come back to normal scheduler world.

If any of the above checks has a “no” answer, either current airflow cluster is 
not configured to use smart sensor or the task itself is out of smart sensor 
scope. the scheduler will schedule task instance just like no smart sensor 
exist.
h4. Include smart sensor DAG by configuration

We are using a smart sensor DAG to kick off all smart sensor tasks. If airflow 
is configured to use smart sensor. The DAG will be included in the scheduler 
parsing paths. Implementation similar as example dags.
h2. Smart sensor operator
h3. Smart sensor logic

In each execute loop:
 * refresh_all_dict(): Select all tasks from DB with state “smart_pending” or 
“smart_running” and shardcode qualified.
 * For all tasks in the task dictionary to poke:
 * If task with same persist_field has been poked in this round 
 * If task poked has a final state, don’t need to do anything
 * If task poked does not have a final state, need to handle timeout


 * Else (not poked in this round)
 * Execute the sensor job
 * For success or failed state, mark states in airflow DB for all tasks that
 * Have same persist_fields hashcode
 * State in (“smart_pending”, “smart_running”)


 * Check and handle timeout

Issue

Smart sensor need to handle the following issues:
 # Get multiple tasks qualified for smart sensor.
 # Do the work for all collected sensor tasks
 # Sensor tasks duplication.
 # Sharding when there are multiple smart sensor running.

Dedup and shard:

Attr_dict ⇒ hashcode ⇒ shardcode

Hashcode = hash(attr_dict)

Shardcode = Hashcode % (max_shard + 1)

The range of shardcode, number of smart sensor tasks can be configured in 
airflow.cfg

Each smart sensor task has a _shardcode range_ and only query tasks whose 
shardcode in this range. Duplicate sensor task will have the same hash code and 
same shardcode so they are going to be handled by the same smart sensor task.
h2. Schema change:
h3. Task_instance table: (add 4 columns and 2 indexes)

op.add_column('task_instance', sa.Column('attr_dict', sa.Text(), nullable=True))

op.add_column('task_instance', sa.Column('exec_dict', sa.Text(), nullable=True))

op.add_column('task_instance', sa.Column('hashcode', sa.BigInteger(), 
nullable=True))

op.add_column('task_instance', sa.Column('shardcode', sa.Integer(), 
nullable=True))

 

op.create_index('ti_hashcode', 'task_instance', ['hashcode'], unique=False)

op.create_index('ti_shardcode', 'task_instance', ['shardcode'], unique=False)
h2. Remaining Issue
 # Handle timeout: Save the timeout and execution_timeout in exec_dict column.
 # When a timeout was detected, set the single sensor task to failed or 
up_for_retry and expect scheduler set it to smart_pending as retry
 # Calculate the total seconds of final failed duration and keep the task in 
smart sensor state until it failed/success.  (min(timeout, execution_timeout) * 
(retries + 1))


 # Smart sensor DAG handle. Should it be manually or in source code.
 # Special logic for smart sensor health check.

  was:
h2. Problem
h3. Airflow Sensor:

Sensors are a certain type of operator that will keep running until a certain 
criterion is met. Examples include a specific file landing in HDFS or S3, a 
partition appearing in Hive, or a specific time of the day. Sensors are derived 
from BaseSensorOperator and run a poke method at a specified poke_interval 
until it returns True.

Airflow Sensor duplication is a normal problem for large scale airflow project. 
There are duplicated partitions needing to be detected from same/different DAG. 
In Airbnb there are 88 boxes running four different types of sensors everyday. 
The number of running sensor tasks ranges from 8k to 16k, which takes great 
amount of resources. Although Airflow team had redirected all sensors to a 
specific queue to allocate relatively minor resource, there is still large room 
to reduce the number of workers and relief DB pressure by optimizing the sensor 
mechanism.

Existing sensor implementation creates an identical task for any sensor task 
with specific dag_id, task_id and execution_date. This task is responsible of 
keeping querying DB until the specified partitions exists. Even if two tasks 
are waiting for same partition in DB, they are creating two connections with 
the DB and checking the status in two separate processes. In one hand, DB need 
to run duplicate jobs in multiple processes which will take both cpu and memory 
resources. At the same time, Airflow need to maintain a process for each sensor 
to query and wait for the partition/table to be created.

To optimize the sensor, add a hashcode for each partition decided by the set of 
(conn_id, schema, table, partition). Add dependencies between qualified sensors 
and partitions. Use a single entry for each sensor to query DB and avoid 
duplication in Airflow.

Add a sensor scheduling part in scheduler to:
 # Check partitions status to enable downstream sensor success and trigger 
sensor downstream tasks
 # Selecting all pending partitions in DB including:
 ## New coming partition sensor request
 ## Existing sensor request that is still waiting
 # With a time interval:
 ## Create the set of tasks for sensing all pending partitions.
 ## Kill previous sensor tasks
 # For the task mentioned in 3: Each task should check many partitions. We can 
introduce the sensor chunk number here for a maximum number of partitions one 
task should handle. The sensors keep updating partition status in Airflow DB 
during running.


> Reduce duplicated tasks and optimize with scheduler embedded sensor 
> --------------------------------------------------------------------
>
>                 Key: AIRFLOW-3964
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3964
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: dependencies, operators, scheduler
>    Affects Versions: 1.10.0
>            Reporter: Yingbo Wang
>            Assignee: Yingbo Wang
>            Priority: Critical
>
> h2. Problem
> h3. Airflow Sensor:
> Sensors are a certain type of operator that will keep running until a certain 
> criterion is met. Examples include a specific file landing in HDFS or S3, a 
> partition appearing in Hive, or a specific time of the day. Sensors are 
> derived from BaseSensorOperator and run a poke method at a specified 
> poke_interval until it returns True.
> Airflow Sensor duplication is a normal problem for large scale airflow 
> project. There are duplicated partitions needing to be detected from 
> same/different DAG. In Airbnb there are 88 boxes running four different types 
> of sensors everyday. The number of running sensor tasks ranges from 8k to 
> 16k, which takes great amount of resources. Although Airflow team had 
> redirected all sensors to a specific queue to allocate relatively minor 
> resource, there is still large room to reduce the number of workers and 
> relief DB pressure by optimizing the sensor mechanism.
> Existing sensor implementation creates an identical task for any sensor task 
> with specific dag_id, task_id and execution_date. This task is responsible of 
> keeping querying DB until the specified partitions exists. Even if two tasks 
> are waiting for same partition in DB, they are creating two connections with 
> the DB and checking the status in two separate processes. In one hand, DB 
> need to run duplicate jobs in multiple processes which will take both cpu and 
> memory resources. At the same time, Airflow need to maintain a process for 
> each sensor to query and wait for the partition/table to be created.
> h1. ***Design*
> There are several issues need to be resolved for our smart sensor. 
> h2. Persist sensor infor in DB and avoid file parsing before running
> Current Airflow implementation need to parse the DAG python file before 
> running a task. Parsing multiple python file in a smart sensor make the case 
> low efficiency and overload. Since sensor tasks need relatively more “light 
> weight” executing information -- less number of properties with simple 
> structure (most are built in type instead of function or object). We propose 
> to skip the parsing for smart sensor. The easiest way is to persist all task 
> instance information in airflow metaDB. 
> h3. Solution:
> It will be hard to dump the whole task instance object dictionary. And we do 
> not really need that much information. 
> We add two sets to the base sensor class as “persist_fields” and 
> “execute_fields”. 
> h4. “persist_fields”  dump to airflow.task_instance column “attr_dict”
> saves the attribute names that should be used to accomplish a sensor poking 
> job. For example:
>  #  the “NamedHivePartitionSensor” define its persist_fields as  
> ('partition_names', 'metastore_conn_id', 'hook') since these properties are 
> enough for its poking function. 
>  # While the HivePartitionSensor can be slightly different use persist_fields 
> as ('schema', 'table', 'partition', 'metastore_conn_id')
> If we have two tasks that have same property value for all field in 
> persist_fields. That means these two tasks are poking the same item and they 
> are holding duplicate poking jobs in senser. 
> *The persist_fields can help us in deduplicate sensor tasks*. In a more 
> broader way. If we can list persist_fields for all operators, it can help to 
> dedup all airflow tasks.
> h4. “Execute_fields” dump to airflow.task_instance column “exec_dict”
> Saves the execution configuration such as “poke_interval”, “timeout”, 
> “execution_timeout”
> Fields in this set do not contain information affecting the poking job 
> detail. They are related to how frequent should we poke, when should the task 
> timeout, how many times timeout should be a fail etc. We only put those logic 
> that we can easily handle in a smart sensor for now. This is a smart sensor 
> “doable whitelist” and can be extended with more logic being “unlocked” by 
> smart sensor implementation. 
>  When we initialize a task instance object. We dump the attribute value of 
> these two sets and persist them in the Airflow metaDB. Smart sensor can visit 
> DB to get all required information of running sensor tasks and don’t need to 
> parse any DAG files.
> h2. Airflow scheduler change
> We do not want to break any existing logic in scheduler. The smart sensor is 
> a configurable mode and can be easily fallback to scheduler regular logic 
> when it detects the case is not good for smart sensor.
> h3. Solution
> h4. Scheduler process_file
> Right before we set a task instance state to “scheduled”, add smart sensor 
> check to do:
>  # Check if Airflow is configured as use smart sensor
>  # Check if current task is good for smart sensor running
> If both check got a “yes” that means the task instance is qualified for smart 
> sensor. Airflow scheduler set its state to “smart_pending” instead of 
> “scheduled” and this task instance will  *NOT BE QUEUED* to the executor. It 
> is expected to be picked up by a smart sensor task from DB query. Smart 
> sensor will update the state to final state (“success” or “failed”) or 
> “up_for_retry” and it can come back to normal scheduler world.
> If any of the above checks has a “no” answer, either current airflow cluster 
> is not configured to use smart sensor or the task itself is out of smart 
> sensor scope. the scheduler will schedule task instance just like no smart 
> sensor exist.
> h4. Include smart sensor DAG by configuration
> We are using a smart sensor DAG to kick off all smart sensor tasks. If 
> airflow is configured to use smart sensor. The DAG will be included in the 
> scheduler parsing paths. Implementation similar as example dags.
> h2. Smart sensor operator
> h3. Smart sensor logic
> In each execute loop:
>  * refresh_all_dict(): Select all tasks from DB with state “smart_pending” or 
> “smart_running” and shardcode qualified.
>  * For all tasks in the task dictionary to poke:
>  * If task with same persist_field has been poked in this round 
>  * If task poked has a final state, don’t need to do anything
>  * If task poked does not have a final state, need to handle timeout
>  * Else (not poked in this round)
>  * Execute the sensor job
>  * For success or failed state, mark states in airflow DB for all tasks that
>  * Have same persist_fields hashcode
>  * State in (“smart_pending”, “smart_running”)
>  * Check and handle timeout
> Issue
> Smart sensor need to handle the following issues:
>  # Get multiple tasks qualified for smart sensor.
>  # Do the work for all collected sensor tasks
>  # Sensor tasks duplication.
>  # Sharding when there are multiple smart sensor running.
> Dedup and shard:
> Attr_dict ⇒ hashcode ⇒ shardcode
> Hashcode = hash(attr_dict)
> Shardcode = Hashcode % (max_shard + 1)
> The range of shardcode, number of smart sensor tasks can be configured in 
> airflow.cfg
> Each smart sensor task has a _shardcode range_ and only query tasks whose 
> shardcode in this range. Duplicate sensor task will have the same hash code 
> and same shardcode so they are going to be handled by the same smart sensor 
> task.
> h2. Schema change:
> h3. Task_instance table: (add 4 columns and 2 indexes)
> op.add_column('task_instance', sa.Column('attr_dict', sa.Text(), 
> nullable=True))
> op.add_column('task_instance', sa.Column('exec_dict', sa.Text(), 
> nullable=True))
> op.add_column('task_instance', sa.Column('hashcode', sa.BigInteger(), 
> nullable=True))
> op.add_column('task_instance', sa.Column('shardcode', sa.Integer(), 
> nullable=True))
>  
> op.create_index('ti_hashcode', 'task_instance', ['hashcode'], unique=False)
> op.create_index('ti_shardcode', 'task_instance', ['shardcode'], unique=False)
> h2. Remaining Issue
>  # Handle timeout: Save the timeout and execution_timeout in exec_dict column.
>  # When a timeout was detected, set the single sensor task to failed or 
> up_for_retry and expect scheduler set it to smart_pending as retry
>  # Calculate the total seconds of final failed duration and keep the task in 
> smart sensor state until it failed/success.  (min(timeout, execution_timeout) 
> * (retries + 1))
>  # Smart sensor DAG handle. Should it be manually or in source code.
>  # Special logic for smart sensor health check.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to