[ 
https://issues.apache.org/jira/browse/AIRFLOW-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17175892#comment-17175892
 ] 

ASF GitHub Bot commented on AIRFLOW-3964:
-----------------------------------------

YingboWang commented on a change in pull request #5499:
URL: https://github.com/apache/airflow/pull/5499#discussion_r468922578



##########
File path: airflow/sensors/smart_sensor_operator.py
##########
@@ -0,0 +1,730 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import datetime
+import json
+import logging
+import time
+import traceback
+from logging.config import DictConfigurator  # type: ignore
+from time import sleep
+
+from sqlalchemy import and_, or_, tuple_
+
+from airflow.exceptions import AirflowException, AirflowTaskTimeout
+from airflow.models import BaseOperator, SensorInstance, SkipMixin, 
TaskInstance
+from airflow.settings import LOGGING_CLASS_PATH
+from airflow.stats import Stats
+from airflow.utils import helpers, timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.email import send_email
+from airflow.utils.log.logging_mixin import set_context
+from airflow.utils.module_loading import import_string
+from airflow.utils.net import get_hostname
+from airflow.utils.session import provide_session
+from airflow.utils.state import PokeState, State
+from airflow.utils.timeout import timeout
+
+config = import_string(LOGGING_CLASS_PATH)
+handler_config = config['handlers']['task']
+try:
+    formatter_config = config['formatters'][handler_config['formatter']]
+except Exception as err:  # pylint: disable=broad-except
+    formatter_config = None
+    print(err)
+dictConfigurator = DictConfigurator(config)
+
+
+class SensorWork:
+    """
+    This class stores a sensor work with decoded context value. It is only used
+    inside of smart sensor.
+    """
+    def __init__(self, ti):
+        self.dag_id = ti.dag_id
+        self.task_id = ti.task_id
+        self.execution_date = ti.execution_date
+        self.try_number = ti.try_number
+
+        self.poke_context = json.loads(ti.poke_context) if ti.poke_context 
else {}
+        self.execution_context = json.loads(ti.execution_context) if 
ti.execution_context else {}
+        try:
+            self.log = self._get_sensor_logger(ti)
+        except Exception as e:  # pylint: disable=broad-except
+            self.log = None
+            print(e)
+        self.hashcode = ti.hashcode
+        self.start_date = ti.start_date
+        self.operator = ti.operator
+        self.op_classpath = ti.op_classpath
+        self.encoded_poke_context = ti.poke_context
+
+    def __eq__(self, other):
+        if not isinstance(other, SensorWork):
+            return NotImplemented
+
+        return self.dag_id == other.dag_id and \
+            self.task_id == other.task_id and \
+            self.execution_date == other.execution_date and \
+            self.try_number == other.try_number
+
+    @staticmethod
+    def create_new_task_handler():
+        """
+        Create task log handler for a sensor work.
+        :return: log handler
+        """
+        handler_config_copy = {k: handler_config[k] for k in handler_config}
+        formatter_config_copy = {k: formatter_config[k] for k in 
formatter_config}
+        handler = dictConfigurator.configure_handler(handler_config_copy)
+        formatter = dictConfigurator.configure_formatter(formatter_config_copy)
+        handler.setFormatter(formatter)
+        return handler
+
+    def _get_sensor_logger(self, ti):
+        # TODO: should be somewhere else, but not this file, has to use 
LOG_ID_TEMPLATE from es
+        # but how about other log file handler?
+        ti.raw = False  # Otherwise set_context will fail
+        log_id = "-".join([ti.dag_id,
+                           ti.task_id,
+                           ti.execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f"),
+                           str(ti.try_number)])
+        logger = logging.getLogger('airflow.task' + '.' + log_id)
+
+        if len(logger.handlers) == 0:
+            handler = self.create_new_task_handler()
+            logger.addHandler(handler)
+            set_context(logger, ti)
+
+            line_break = ("-" * 120)
+            logger.info(line_break)
+            logger.info("Processing sensor task %s in smart sensor service on 
host: %s",
+                        self.ti_key, get_hostname())
+            logger.info(line_break)
+        return logger
+
+    def close_sensor_logger(self):
+        """
+        Close log handler for a sensor work.
+        """
+        for handler in self.log.handlers:
+            try:
+                handler.close()
+            except Exception as e:  # pylint: disable=broad-except
+                print(e)
+
+    @property
+    def ti_key(self):
+        """
+        Key for the task instance that maps to the sensor work.
+        """
+        return self.dag_id, self.task_id, self.execution_date
+
+    @property
+    def cache_key(self):
+        """
+        Key used to query in smart sensor for cached sensor work.
+        """
+        return self.operator, self.encoded_poke_context
+
+
+class CachedPokeWork:
+    """
+    Wrapper class for the poke work inside smart sensor. It saves
+    the sensor_task used to poke and recent poke result state
+    """
+    def __init__(self):
+        self.state = None
+        self.sensor_task = None
+        self.last_poke_time = None
+        self.to_flush = False
+
+    def set_state(self, state):
+        """
+        Set state for cached poke work.
+        :param state:
+        """
+        self.state = state
+        self.last_poke_time = timezone.utcnow()
+
+    def clear_state(self):
+        """
+        Clear state for cached poke work.
+        """
+        self.state = None
+
+    def set_to_flush(self):
+        """
+        Mark this poke work to be popped from cached dict after current loop.
+        """
+        self.to_flush = True
+
+    def is_expired(self):
+        """
+        The cached task object expires if there is no poke for 20 mins.
+        :return: Boolean
+        """
+        return self.to_flush or (timezone.utcnow() - 
self.last_poke_time).total_seconds() > 1200
+
+
+class SensorExceptionInfo:
+    """
+    Hold sensor exception information and the type of exception. For possible 
transient
+    infra failure, give the task more chance to retry before fail it.
+    """
+    def __init__(self,
+                 exception_info,
+                 is_infra_failure=False,
+                 infra_failure_retry_window=datetime.timedelta(minutes=130)):
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+        self._infra_failure_retry_window = infra_failure_retry_window
+
+        self._infra_failure_timeout = None
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_latest_exception(self, exception_info, is_infra_failure=False):
+        """
+        This function set the latest exception information for sensor 
exception. If the exception
+        implies an infra failure, this function will check the recorded infra 
failure timeout
+        which was set at the first infra failure exception arrives. There is a 
6 hours window
+        for retry without failing current run.
+
+        :param exception_info:
+        :param is_infra_failure:
+        :return:
+        """
+        self._exception_info = exception_info
+        self._is_infra_failure = is_infra_failure
+
+        self.set_infra_failure_timeout()
+        self.fail_current_run = self.should_fail_current_run()
+
+    def set_infra_failure_timeout(self):
+        """
+        Set the time point when the sensor should be failed if it kept getting 
infra
+        failure.
+        :return:
+        """
+        # Only set the infra_failure_timeout if there is no existing one
+        if not self._is_infra_failure:
+            self._infra_failure_timeout = None
+        elif self._infra_failure_timeout is None:
+            self._infra_failure_timeout = timezone.utcnow() + 
self._infra_failure_retry_window
+
+    def should_fail_current_run(self):
+        """
+        :return: Should the sensor fail
+        :type: boolean
+        """
+        return not self.is_infra_failure or timezone.utcnow() > 
self._infra_failure_timeout
+
+    @property
+    def exception_info(self):
+        """
+        :return: exception msg.
+        """
+        return self._exception_info
+
+    @property
+    def is_infra_failure(self):
+        """
+
+        :return: If the exception is an infra failure
+        :type: boolean
+        """
+        return self._is_infra_failure
+
+    def is_expired(self):
+        """
+        :return: If current exception need to be kept.
+        :type: boolean
+        """
+        if not self._is_infra_failure:
+            return True
+        return timezone.utcnow() > self._infra_failure_timeout + 
datetime.timedelta(minutes=30)
+
+
+class SmartSensorOperator(BaseOperator, SkipMixin):
+    """
+    Smart sensor operators are derived from this class.
+
+    Smart Sensor operators keep refresh a dictionary by visiting DB.
+    Taking qualified active sensor tasks. Different from sensor operator,
+    Smart sensor operators poke for all sensor tasks in the dictionary at
+    a time interval. When a criteria is met or fail by time out, it update
+    all sensor task state in task_instance table
+
+    :param soft_fail: Set to true to mark the task as SKIPPED on failure
+    :type soft_fail: bool
+    :param poke_interval: Time in seconds that the job should wait in
+        between each tries
+    :type poke_interval: int
+    :param timeout: Time, in seconds before the task times out and fails.
+    :type timeout: int
+    :type mode: str
+    """
+    ui_color = '#e6f1f2'
+
+    @apply_defaults
+    def __init__(self,
+                 poke_interval=180,
+                 smart_sensor_timeout=60 * 60 * 24 * 7,
+                 soft_fail=False,
+                 shard_min=0,
+                 shard_max=100000,
+                 poke_exception_cache_ttl=600,
+                 poke_timeout=6,
+                 poke_exception_to_fail_task_threshold=3,
+                 *args,
+                 **kwargs):
+        super().__init__(*args, **kwargs)
+        # super(SmartSensorOperator, self).__init__(*args, **kwargs)
+        self.poke_interval = poke_interval
+        self.soft_fail = soft_fail
+        self.timeout = smart_sensor_timeout
+        self._validate_input_values()
+        self.hostname = ""
+
+        self.sensor_works = []
+        self.cached_dedup_works = {}
+        self.cached_sensor_exceptions = {}
+
+        self.max_tis_per_query = 50
+        self.shard_min = shard_min
+        self.shard_max = shard_max
+        self.poke_exception_cache_ttl = poke_exception_cache_ttl
+        self.poke_timeout = poke_timeout
+        self._poke_exception_to_fail_task_threshold = 
poke_exception_to_fail_task_threshold
+
+    def _validate_input_values(self):
+        if not isinstance(self.poke_interval, (int, float)) or 
self.poke_interval < 0:
+            raise AirflowException(
+                "The poke_interval must be a non-negative number")
+        if not isinstance(self.timeout, (int, float)) or self.timeout < 0:
+            raise AirflowException(
+                "The timeout must be a non-negative number")
+
+    @provide_session
+    def _load_sensor_works(self, session=None):
+        """
+        Refresh sensor instances need to be handled by this operator. Put the 
context,
+        hashcode and sensor instance start_date in a wrapper class
+        :param session:
+        :return:
+        """
+        SI = SensorInstance
+        start_query_time = time.time()
+        query = session.query(SI) \
+            .filter(SI.state == State.SENSING)\
+            .filter(SI.shardcode < self.shard_max,
+                    SI.shardcode >= self.shard_min)
+        tis = query.all()
+        session.commit()
+        self.log.info("Performance query %s tis, time: %s", len(tis), 
time.time() - start_query_time)
+
+        # Query without checking dagrun state might keep some failed dag_run 
tasks alive.
+        # Join with DagRun table will be very slow based on the number of 
sensor tasks we
+        # need to handle. We query all smart tasks in this operator
+        # and expect scheduler correct the states in 
_change_state_for_tis_without_dagrun()
+
+        sensor_works = []
+        for ti in tis:
+            try:
+                sensor_works.append(SensorWork(ti))
+            except Exception as e:  # pylint: disable=broad-except
+                self.log.exception("Exception at creating sensor work for ti 
%s", ti.key)
+                self.log.exception(e, exc_info=True)
+
+        self.log.info("%d tasks detected.", len(sensor_works))
+
+        new_sensor_works = [x for x in sensor_works if x not in 
self.sensor_works]
+
+        self._update_ti_hostname(new_sensor_works)
+
+        self.sensor_works = sensor_works
+
+    @provide_session
+    def _update_ti_hostname(self, sensor_works, session=None):
+        """
+        Update task instance hostname for new sensor works.
+        :param sensor_works:
+        :return:
+        """
+        TI = TaskInstance
+        ti_keys = [(x.dag_id, x.task_id, x.execution_date) for x in 
sensor_works]
+
+        def update_ti_hostname_with_count(count, ti_keys):
+            # Using or_ instead of in_ here to prevent from full table scan.
+            tis = session.query(TI) \
+                .filter(or_(tuple_(TI.dag_id, TI.task_id, TI.execution_date) 
== ti_key
+                            for ti_key in ti_keys)) \
+                .all()
+
+            for ti in tis:
+                ti.hostname = self.hostname
+            session.commit()
+
+            return count + len(ti_keys)
+
+        count = helpers.reduce_in_chunks(update_ti_hostname_with_count, 
ti_keys, 0, self.max_tis_per_query)
+        if count:
+            self.log.info("Updated hostname on %s tis.", count)
+
+    @provide_session
+    def _mark_multi_state(self, operator, poke_hash, encoded_poke_context, 
state, session=None):
+        """
+        Mark state for multiple tasks that have hashcode=poke_hash.
+        :param poke_hash:
+        :param state:
+        :param session:
+        :return:
+        """
+
+        def mark_state(ti, sensor_instance):
+            ti.state = state
+            sensor_instance.state = state
+            if state in State.finished():
+                ti.end_date = end_date
+                ti.set_duration()
+
+        SI = SensorInstance
+        TI = TaskInstance
+
+        count_marked = 0
+        try:
+            query_result = session.query(TI, SI)\
+                .join(TI, and_(TI.dag_id == SI.dag_id,
+                               TI.task_id == SI.task_id,
+                               TI.execution_date == SI.execution_date)) \
+                .filter(SI.state == State.SENSING) \
+                .filter(SI.hashcode == poke_hash) \
+                .filter(SI.operator == operator) \
+                .with_for_update().all()
+
+            end_date = timezone.utcnow()
+            for ti, sensor_instance in query_result:
+                if sensor_instance.poke_context != encoded_poke_context:
+                    continue
+
+                ti.hostname = self.hostname
+                if ti.state == State.SENSING:
+                    mark_state(ti=ti, sensor_instance=sensor_instance)
+                    count_marked += 1
+                else:
+                    # ti.state != State.SENSING

Review comment:
       Task instance states could be changed by user activities such as marking 
dagrun and ti states from UI. The `sensor_instance` table will respect 
`task_instance` state changing that happened during it's poking phase. So if a 
user mark a ti state, it will not be overwritten by smart sensor.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


> Consolidate and de-duplicate sensor tasks 
> ------------------------------------------
>
>                 Key: AIRFLOW-3964
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3964
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: dependencies, operators, scheduler
>    Affects Versions: 1.10.0
>            Reporter: Yingbo Wang
>            Assignee: Yingbo Wang
>            Priority: Critical
>
> h2. Problem
> h3. Airflow Sensor:
> Sensors are a certain type of operator that will keep running until a certain 
> criterion is met. Examples include a specific file landing in HDFS or S3, a 
> partition appearing in Hive, or a specific time of the day. Sensors are 
> derived from BaseSensorOperator and run a poke method at a specified 
> poke_interval until it returns True.
> Airflow Sensor duplication is a normal problem for large scale airflow 
> project. There are duplicated partitions needing to be detected from 
> same/different DAG. In Airbnb there are 88 boxes running four different types 
> of sensors everyday. The number of running sensor tasks ranges from 8k to 
> 16k, which takes great amount of resources. Although Airflow team had 
> redirected all sensors to a specific queue to allocate relatively minor 
> resource, there is still large room to reduce the number of workers and 
> relief DB pressure by optimizing the sensor mechanism.
> Existing sensor implementation creates an identical task for any sensor task 
> with specific dag_id, task_id and execution_date. This task is responsible of 
> keeping querying DB until the specified partitions exists. Even if two tasks 
> are waiting for same partition in DB, they are creating two connections with 
> the DB and checking the status in two separate processes. In one hand, DB 
> need to run duplicate jobs in multiple processes which will take both cpu and 
> memory resources. At the same time, Airflow need to maintain a process for 
> each sensor to query and wait for the partition/table to be created.
> h1. ***Design*
> There are several issues need to be resolved for our smart sensor. 
> h2. Persist sensor infor in DB and avoid file parsing before running
> Current Airflow implementation need to parse the DAG python file before 
> running a task. Parsing multiple python file in a smart sensor make the case 
> low efficiency and overload. Since sensor tasks need relatively more “light 
> weight” executing information -- less number of properties with simple 
> structure (most are built in type instead of function or object). We propose 
> to skip the parsing for smart sensor. The easiest way is to persist all task 
> instance information in airflow metaDB. 
> h3. Solution:
> It will be hard to dump the whole task instance object dictionary. And we do 
> not really need that much information. 
> We add two sets to the base sensor class as “persist_fields” and 
> “execute_fields”. 
> h4. “persist_fields”  dump to airflow.task_instance column “attr_dict”
> saves the attribute names that should be used to accomplish a sensor poking 
> job. For example:
>  #  the “NamedHivePartitionSensor” define its persist_fields as  
> ('partition_names', 'metastore_conn_id', 'hook') since these properties are 
> enough for its poking function. 
>  # While the HivePartitionSensor can be slightly different use persist_fields 
> as ('schema', 'table', 'partition', 'metastore_conn_id')
> If we have two tasks that have same property value for all field in 
> persist_fields. That means these two tasks are poking the same item and they 
> are holding duplicate poking jobs in senser. 
> *The persist_fields can help us in deduplicate sensor tasks*. In a more 
> broader way. If we can list persist_fields for all operators, it can help to 
> dedup all airflow tasks.
> h4. “Execute_fields” dump to airflow.task_instance column “exec_dict”
> Saves the execution configuration such as “poke_interval”, “timeout”, 
> “execution_timeout”
> Fields in this set do not contain information affecting the poking job 
> detail. They are related to how frequent should we poke, when should the task 
> timeout, how many times timeout should be a fail etc. We only put those logic 
> that we can easily handle in a smart sensor for now. This is a smart sensor 
> “doable whitelist” and can be extended with more logic being “unlocked” by 
> smart sensor implementation. 
>  When we initialize a task instance object. We dump the attribute value of 
> these two sets and persist them in the Airflow metaDB. Smart sensor can visit 
> DB to get all required information of running sensor tasks and don’t need to 
> parse any DAG files.
> h2. Airflow scheduler change
> We do not want to break any existing logic in scheduler. The smart sensor is 
> a configurable mode and can be easily fallback to scheduler regular logic 
> when it detects the case is not good for smart sensor.
> h3. Solution
> h4. Scheduler process_file
> Right before we set a task instance state to “scheduled”, add smart sensor 
> check to do:
>  # Check if Airflow is configured as use smart sensor
>  # Check if current task is good for smart sensor running
> If both check got a “yes” that means the task instance is qualified for smart 
> sensor. Airflow scheduler set its state to “smart_pending” instead of 
> “scheduled” and this task instance will  *NOT BE QUEUED* to the executor. It 
> is expected to be picked up by a smart sensor task from DB query. Smart 
> sensor will update the state to final state (“success” or “failed”) or 
> “up_for_retry” and it can come back to normal scheduler world.
> If any of the above checks has a “no” answer, either current airflow cluster 
> is not configured to use smart sensor or the task itself is out of smart 
> sensor scope. the scheduler will schedule task instance just like no smart 
> sensor exist.
> h4. Include smart sensor DAG by configuration
> We are using a smart sensor DAG to kick off all smart sensor tasks. If 
> airflow is configured to use smart sensor. The DAG will be included in the 
> scheduler parsing paths. Implementation similar as example dags.
> h2. Smart sensor operator
> h3. Smart sensor logic
> In each execute loop:
>  * refresh_all_dict(): Select all tasks from DB with state “smart_pending” or 
> “smart_running” and shardcode qualified.
>  * For all tasks in the task dictionary to poke:
>  * If task with same persist_field has been poked in this round 
>  * If task poked has a final state, don’t need to do anything
>  * If task poked does not have a final state, need to handle timeout
>  * Else (not poked in this round)
>  * Execute the sensor job
>  * For success or failed state, mark states in airflow DB for all tasks that
>  * Have same persist_fields hashcode
>  * State in (“smart_pending”, “smart_running”)
>  * Check and handle timeout
> Issue
> Smart sensor need to handle the following issues:
>  # Get multiple tasks qualified for smart sensor.
>  # Do the work for all collected sensor tasks
>  # Sensor tasks duplication.
>  # Sharding when there are multiple smart sensor running.
> Dedup and shard:
> Attr_dict ⇒ hashcode ⇒ shardcode
> Hashcode = hash(attr_dict)
> Shardcode = Hashcode % (max_shard + 1)
> The range of shardcode, number of smart sensor tasks can be configured in 
> airflow.cfg
> Each smart sensor task has a _shardcode range_ and only query tasks whose 
> shardcode in this range. Duplicate sensor task will have the same hash code 
> and same shardcode so they are going to be handled by the same smart sensor 
> task.
> h2. Schema change:
> h3. Task_instance table: (add 4 columns and 2 indexes)
> op.add_column('task_instance', sa.Column('attr_dict', sa.Text(), 
> nullable=True))
> op.add_column('task_instance', sa.Column('exec_dict', sa.Text(), 
> nullable=True))
> op.add_column('task_instance', sa.Column('hashcode', sa.BigInteger(), 
> nullable=True))
> op.add_column('task_instance', sa.Column('shardcode', sa.Integer(), 
> nullable=True))
>  
> op.create_index('ti_hashcode', 'task_instance', ['hashcode'], unique=False)
> op.create_index('ti_shardcode', 'task_instance', ['shardcode'], unique=False)
> h2. Remaining Issue
>  # Handle timeout: Save the timeout and execution_timeout in exec_dict column.
>  # When a timeout was detected, set the single sensor task to failed or 
> up_for_retry and expect scheduler set it to smart_pending as retry
>  # Calculate the total seconds of final failed duration and keep the task in 
> smart sensor state until it failed/success.  (min(timeout, execution_timeout) 
> * (retries + 1))
>  # Smart sensor DAG handle. Should it be manually or in source code.
>  # Special logic for smart sensor health check.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to