[ 
https://issues.apache.org/jira/browse/AIRFLOW-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192497#comment-17192497
 ] 

ASF subversion and git services commented on AIRFLOW-3964:
----------------------------------------------------------

Commit ac943c9e18f75259d531dbda8c51e650f57faa4c in airflow's branch 
refs/heads/master from Yingbo Wang
[ https://gitbox.apache.org/repos/asf?p=airflow.git;h=ac943c9 ]

[AIRFLOW-3964][AIP-17] Consolidate and de-dup sensor tasks using Smart Sensor 
(#5499)

Co-authored-by: Yingbo Wang <yingbo.w...@airbnb.com>

> Consolidate and de-duplicate sensor tasks 
> ------------------------------------------
>
>                 Key: AIRFLOW-3964
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3964
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: dependencies, operators, scheduler
>    Affects Versions: 1.10.0
>            Reporter: Yingbo Wang
>            Assignee: Yingbo Wang
>            Priority: Critical
>
> h2. Problem
> h3. Airflow Sensor:
> Sensors are a certain type of operator that will keep running until a certain 
> criterion is met. Examples include a specific file landing in HDFS or S3, a 
> partition appearing in Hive, or a specific time of the day. Sensors are 
> derived from BaseSensorOperator and run a poke method at a specified 
> poke_interval until it returns True.
> Airflow Sensor duplication is a normal problem for large scale airflow 
> project. There are duplicated partitions needing to be detected from 
> same/different DAG. In Airbnb there are 88 boxes running four different types 
> of sensors everyday. The number of running sensor tasks ranges from 8k to 
> 16k, which takes great amount of resources. Although Airflow team had 
> redirected all sensors to a specific queue to allocate relatively minor 
> resource, there is still large room to reduce the number of workers and 
> relief DB pressure by optimizing the sensor mechanism.
> Existing sensor implementation creates an identical task for any sensor task 
> with specific dag_id, task_id and execution_date. This task is responsible of 
> keeping querying DB until the specified partitions exists. Even if two tasks 
> are waiting for same partition in DB, they are creating two connections with 
> the DB and checking the status in two separate processes. In one hand, DB 
> need to run duplicate jobs in multiple processes which will take both cpu and 
> memory resources. At the same time, Airflow need to maintain a process for 
> each sensor to query and wait for the partition/table to be created.
> h1. ***Design*
> There are several issues need to be resolved for our smart sensor. 
> h2. Persist sensor infor in DB and avoid file parsing before running
> Current Airflow implementation need to parse the DAG python file before 
> running a task. Parsing multiple python file in a smart sensor make the case 
> low efficiency and overload. Since sensor tasks need relatively more “light 
> weight” executing information -- less number of properties with simple 
> structure (most are built in type instead of function or object). We propose 
> to skip the parsing for smart sensor. The easiest way is to persist all task 
> instance information in airflow metaDB. 
> h3. Solution:
> It will be hard to dump the whole task instance object dictionary. And we do 
> not really need that much information. 
> We add two sets to the base sensor class as “persist_fields” and 
> “execute_fields”. 
> h4. “persist_fields”  dump to airflow.task_instance column “attr_dict”
> saves the attribute names that should be used to accomplish a sensor poking 
> job. For example:
>  #  the “NamedHivePartitionSensor” define its persist_fields as  
> ('partition_names', 'metastore_conn_id', 'hook') since these properties are 
> enough for its poking function. 
>  # While the HivePartitionSensor can be slightly different use persist_fields 
> as ('schema', 'table', 'partition', 'metastore_conn_id')
> If we have two tasks that have same property value for all field in 
> persist_fields. That means these two tasks are poking the same item and they 
> are holding duplicate poking jobs in senser. 
> *The persist_fields can help us in deduplicate sensor tasks*. In a more 
> broader way. If we can list persist_fields for all operators, it can help to 
> dedup all airflow tasks.
> h4. “Execute_fields” dump to airflow.task_instance column “exec_dict”
> Saves the execution configuration such as “poke_interval”, “timeout”, 
> “execution_timeout”
> Fields in this set do not contain information affecting the poking job 
> detail. They are related to how frequent should we poke, when should the task 
> timeout, how many times timeout should be a fail etc. We only put those logic 
> that we can easily handle in a smart sensor for now. This is a smart sensor 
> “doable whitelist” and can be extended with more logic being “unlocked” by 
> smart sensor implementation. 
>  When we initialize a task instance object. We dump the attribute value of 
> these two sets and persist them in the Airflow metaDB. Smart sensor can visit 
> DB to get all required information of running sensor tasks and don’t need to 
> parse any DAG files.
> h2. Airflow scheduler change
> We do not want to break any existing logic in scheduler. The smart sensor is 
> a configurable mode and can be easily fallback to scheduler regular logic 
> when it detects the case is not good for smart sensor.
> h3. Solution
> h4. Scheduler process_file
> Right before we set a task instance state to “scheduled”, add smart sensor 
> check to do:
>  # Check if Airflow is configured as use smart sensor
>  # Check if current task is good for smart sensor running
> If both check got a “yes” that means the task instance is qualified for smart 
> sensor. Airflow scheduler set its state to “smart_pending” instead of 
> “scheduled” and this task instance will  *NOT BE QUEUED* to the executor. It 
> is expected to be picked up by a smart sensor task from DB query. Smart 
> sensor will update the state to final state (“success” or “failed”) or 
> “up_for_retry” and it can come back to normal scheduler world.
> If any of the above checks has a “no” answer, either current airflow cluster 
> is not configured to use smart sensor or the task itself is out of smart 
> sensor scope. the scheduler will schedule task instance just like no smart 
> sensor exist.
> h4. Include smart sensor DAG by configuration
> We are using a smart sensor DAG to kick off all smart sensor tasks. If 
> airflow is configured to use smart sensor. The DAG will be included in the 
> scheduler parsing paths. Implementation similar as example dags.
> h2. Smart sensor operator
> h3. Smart sensor logic
> In each execute loop:
>  * refresh_all_dict(): Select all tasks from DB with state “smart_pending” or 
> “smart_running” and shardcode qualified.
>  * For all tasks in the task dictionary to poke:
>  * If task with same persist_field has been poked in this round 
>  * If task poked has a final state, don’t need to do anything
>  * If task poked does not have a final state, need to handle timeout
>  * Else (not poked in this round)
>  * Execute the sensor job
>  * For success or failed state, mark states in airflow DB for all tasks that
>  * Have same persist_fields hashcode
>  * State in (“smart_pending”, “smart_running”)
>  * Check and handle timeout
> Issue
> Smart sensor need to handle the following issues:
>  # Get multiple tasks qualified for smart sensor.
>  # Do the work for all collected sensor tasks
>  # Sensor tasks duplication.
>  # Sharding when there are multiple smart sensor running.
> Dedup and shard:
> Attr_dict ⇒ hashcode ⇒ shardcode
> Hashcode = hash(attr_dict)
> Shardcode = Hashcode % (max_shard + 1)
> The range of shardcode, number of smart sensor tasks can be configured in 
> airflow.cfg
> Each smart sensor task has a _shardcode range_ and only query tasks whose 
> shardcode in this range. Duplicate sensor task will have the same hash code 
> and same shardcode so they are going to be handled by the same smart sensor 
> task.
> h2. Schema change:
> h3. Task_instance table: (add 4 columns and 2 indexes)
> op.add_column('task_instance', sa.Column('attr_dict', sa.Text(), 
> nullable=True))
> op.add_column('task_instance', sa.Column('exec_dict', sa.Text(), 
> nullable=True))
> op.add_column('task_instance', sa.Column('hashcode', sa.BigInteger(), 
> nullable=True))
> op.add_column('task_instance', sa.Column('shardcode', sa.Integer(), 
> nullable=True))
>  
> op.create_index('ti_hashcode', 'task_instance', ['hashcode'], unique=False)
> op.create_index('ti_shardcode', 'task_instance', ['shardcode'], unique=False)
> h2. Remaining Issue
>  # Handle timeout: Save the timeout and execution_timeout in exec_dict column.
>  # When a timeout was detected, set the single sensor task to failed or 
> up_for_retry and expect scheduler set it to smart_pending as retry
>  # Calculate the total seconds of final failed duration and keep the task in 
> smart sensor state until it failed/success.  (min(timeout, execution_timeout) 
> * (retries + 1))
>  # Smart sensor DAG handle. Should it be manually or in source code.
>  # Special logic for smart sensor health check.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to