YingboWang commented on a change in pull request #5499: URL: https://github.com/apache/airflow/pull/5499#discussion_r467362604
########## File path: airflow/models/sensorinstance.py ########## @@ -0,0 +1,166 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json + +from sqlalchemy import BigInteger, Column, Index, Integer, String, Text + +from airflow.configuration import conf +from airflow.exceptions import AirflowException +from airflow.models.base import ID_LEN, Base +from airflow.utils import timezone +from airflow.utils.session import provide_session +from airflow.utils.sqlalchemy import UtcDateTime +from airflow.utils.state import State + + +class SensorInstance(Base): + """ + SensorInstance support the smart sensor service. It stores the sensor task states + and context that required for poking include poke context and execution context. + In sensor_instance table we also save the sensor operator classpath so that inside + smart sensor there is no need to import the dagbag and create task object for each + sensor task. + + SensorInstance include another set of columns to support the smart sensor shard on + large number of sensor instance. By hashcode generated from poke_contex and shardcode + the distributed smart sensor processes can work on different shards. + + """ + + __tablename__ = "sensor_instance" + + id = Column(Integer, primary_key=True) + task_id = Column(String(ID_LEN), nullable=False) + dag_id = Column(String(ID_LEN), nullable=False) + execution_date = Column(UtcDateTime, nullable=False) + state = Column(String(20)) + _try_number = Column('try_number', Integer, default=0) + start_date = Column(UtcDateTime) + operator = Column(String(1000), nullable=False) + op_classpath = Column(String(1000), nullable=False) + hashcode = Column(BigInteger, nullable=False) + shardcode = Column(Integer, nullable=False) + poke_context = Column(Text, nullable=False) + execution_context = Column(Text) + created_at = Column(UtcDateTime, default=timezone.utcnow(), nullable=False) + updated_at = Column(UtcDateTime, + default=timezone.utcnow(), + onupdate=timezone.utcnow(), + nullable=False) + + __table_args__ = ( + Index('ti_primary_key', dag_id, task_id, execution_date, unique=True), + + Index('si_hashcode', hashcode), + Index('si_shardcode', shardcode), + Index('si_state_shard', state, shardcode), + Index('si_updated_at', updated_at), + ) + + def __init__(self, ti): + self.dag_id = ti.dag_id + self.task_id = ti.task_id + self.execution_date = ti.execution_date + + @staticmethod + def get_classpath(obj): + """ + Get the object dotted class path. Used for getting operator classpath + :param obj: + :return: string + """ + module_name, class_name = obj.__module__, obj.__class__.__name__ + + if '.' not in module_name: + # Fix if module name was broken by airflow importer Review comment: Good catch. This is to fix the old airflow importer issue which I believe is deprecated now. I will remove this block. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
