[
https://issues.apache.org/jira/browse/AIRFLOW-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192499#comment-17192499
]
ASF subversion and git services commented on AIRFLOW-3964:
----------------------------------------------------------
Commit ac943c9e18f75259d531dbda8c51e650f57faa4c in airflow's branch
refs/heads/master from Yingbo Wang
[ https://gitbox.apache.org/repos/asf?p=airflow.git;h=ac943c9 ]
[AIRFLOW-3964][AIP-17] Consolidate and de-dup sensor tasks using Smart Sensor
(#5499)
Co-authored-by: Yingbo Wang <[email protected]>
> Consolidate and de-duplicate sensor tasks
> ------------------------------------------
>
> Key: AIRFLOW-3964
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3964
> Project: Apache Airflow
> Issue Type: Improvement
> Components: dependencies, operators, scheduler
> Affects Versions: 1.10.0
> Reporter: Yingbo Wang
> Assignee: Yingbo Wang
> Priority: Critical
>
> h2. Problem
> h3. Airflow Sensor:
> Sensors are a certain type of operator that will keep running until a certain
> criterion is met. Examples include a specific file landing in HDFS or S3, a
> partition appearing in Hive, or a specific time of the day. Sensors are
> derived from BaseSensorOperator and run a poke method at a specified
> poke_interval until it returns True.
> Airflow Sensor duplication is a normal problem for large scale airflow
> project. There are duplicated partitions needing to be detected from
> same/different DAG. In Airbnb there are 88 boxes running four different types
> of sensors everyday. The number of running sensor tasks ranges from 8k to
> 16k, which takes great amount of resources. Although Airflow team had
> redirected all sensors to a specific queue to allocate relatively minor
> resource, there is still large room to reduce the number of workers and
> relief DB pressure by optimizing the sensor mechanism.
> Existing sensor implementation creates an identical task for any sensor task
> with specific dag_id, task_id and execution_date. This task is responsible of
> keeping querying DB until the specified partitions exists. Even if two tasks
> are waiting for same partition in DB, they are creating two connections with
> the DB and checking the status in two separate processes. In one hand, DB
> need to run duplicate jobs in multiple processes which will take both cpu and
> memory resources. At the same time, Airflow need to maintain a process for
> each sensor to query and wait for the partition/table to be created.
> h1. ***Design*
> There are several issues need to be resolved for our smart sensor.
> h2. Persist sensor infor in DB and avoid file parsing before running
> Current Airflow implementation need to parse the DAG python file before
> running a task. Parsing multiple python file in a smart sensor make the case
> low efficiency and overload. Since sensor tasks need relatively more “light
> weight” executing information -- less number of properties with simple
> structure (most are built in type instead of function or object). We propose
> to skip the parsing for smart sensor. The easiest way is to persist all task
> instance information in airflow metaDB.
> h3. Solution:
> It will be hard to dump the whole task instance object dictionary. And we do
> not really need that much information.
> We add two sets to the base sensor class as “persist_fields” and
> “execute_fields”.
> h4. “persist_fields” dump to airflow.task_instance column “attr_dict”
> saves the attribute names that should be used to accomplish a sensor poking
> job. For example:
> # the “NamedHivePartitionSensor” define its persist_fields as
> ('partition_names', 'metastore_conn_id', 'hook') since these properties are
> enough for its poking function.
> # While the HivePartitionSensor can be slightly different use persist_fields
> as ('schema', 'table', 'partition', 'metastore_conn_id')
> If we have two tasks that have same property value for all field in
> persist_fields. That means these two tasks are poking the same item and they
> are holding duplicate poking jobs in senser.
> *The persist_fields can help us in deduplicate sensor tasks*. In a more
> broader way. If we can list persist_fields for all operators, it can help to
> dedup all airflow tasks.
> h4. “Execute_fields” dump to airflow.task_instance column “exec_dict”
> Saves the execution configuration such as “poke_interval”, “timeout”,
> “execution_timeout”
> Fields in this set do not contain information affecting the poking job
> detail. They are related to how frequent should we poke, when should the task
> timeout, how many times timeout should be a fail etc. We only put those logic
> that we can easily handle in a smart sensor for now. This is a smart sensor
> “doable whitelist” and can be extended with more logic being “unlocked” by
> smart sensor implementation.
> When we initialize a task instance object. We dump the attribute value of
> these two sets and persist them in the Airflow metaDB. Smart sensor can visit
> DB to get all required information of running sensor tasks and don’t need to
> parse any DAG files.
> h2. Airflow scheduler change
> We do not want to break any existing logic in scheduler. The smart sensor is
> a configurable mode and can be easily fallback to scheduler regular logic
> when it detects the case is not good for smart sensor.
> h3. Solution
> h4. Scheduler process_file
> Right before we set a task instance state to “scheduled”, add smart sensor
> check to do:
> # Check if Airflow is configured as use smart sensor
> # Check if current task is good for smart sensor running
> If both check got a “yes” that means the task instance is qualified for smart
> sensor. Airflow scheduler set its state to “smart_pending” instead of
> “scheduled” and this task instance will *NOT BE QUEUED* to the executor. It
> is expected to be picked up by a smart sensor task from DB query. Smart
> sensor will update the state to final state (“success” or “failed”) or
> “up_for_retry” and it can come back to normal scheduler world.
> If any of the above checks has a “no” answer, either current airflow cluster
> is not configured to use smart sensor or the task itself is out of smart
> sensor scope. the scheduler will schedule task instance just like no smart
> sensor exist.
> h4. Include smart sensor DAG by configuration
> We are using a smart sensor DAG to kick off all smart sensor tasks. If
> airflow is configured to use smart sensor. The DAG will be included in the
> scheduler parsing paths. Implementation similar as example dags.
> h2. Smart sensor operator
> h3. Smart sensor logic
> In each execute loop:
> * refresh_all_dict(): Select all tasks from DB with state “smart_pending” or
> “smart_running” and shardcode qualified.
> * For all tasks in the task dictionary to poke:
> * If task with same persist_field has been poked in this round
> * If task poked has a final state, don’t need to do anything
> * If task poked does not have a final state, need to handle timeout
> * Else (not poked in this round)
> * Execute the sensor job
> * For success or failed state, mark states in airflow DB for all tasks that
> * Have same persist_fields hashcode
> * State in (“smart_pending”, “smart_running”)
> * Check and handle timeout
> Issue
> Smart sensor need to handle the following issues:
> # Get multiple tasks qualified for smart sensor.
> # Do the work for all collected sensor tasks
> # Sensor tasks duplication.
> # Sharding when there are multiple smart sensor running.
> Dedup and shard:
> Attr_dict ⇒ hashcode ⇒ shardcode
> Hashcode = hash(attr_dict)
> Shardcode = Hashcode % (max_shard + 1)
> The range of shardcode, number of smart sensor tasks can be configured in
> airflow.cfg
> Each smart sensor task has a _shardcode range_ and only query tasks whose
> shardcode in this range. Duplicate sensor task will have the same hash code
> and same shardcode so they are going to be handled by the same smart sensor
> task.
> h2. Schema change:
> h3. Task_instance table: (add 4 columns and 2 indexes)
> op.add_column('task_instance', sa.Column('attr_dict', sa.Text(),
> nullable=True))
> op.add_column('task_instance', sa.Column('exec_dict', sa.Text(),
> nullable=True))
> op.add_column('task_instance', sa.Column('hashcode', sa.BigInteger(),
> nullable=True))
> op.add_column('task_instance', sa.Column('shardcode', sa.Integer(),
> nullable=True))
>
> op.create_index('ti_hashcode', 'task_instance', ['hashcode'], unique=False)
> op.create_index('ti_shardcode', 'task_instance', ['shardcode'], unique=False)
> h2. Remaining Issue
> # Handle timeout: Save the timeout and execution_timeout in exec_dict column.
> # When a timeout was detected, set the single sensor task to failed or
> up_for_retry and expect scheduler set it to smart_pending as retry
> # Calculate the total seconds of final failed duration and keep the task in
> smart sensor state until it failed/success. (min(timeout, execution_timeout)
> * (retries + 1))
> # Smart sensor DAG handle. Should it be manually or in source code.
> # Special logic for smart sensor health check.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)