Hi,
I have a MySQL table, which will be stored some static information. The
information could be different for different airflow runs, so I hope to use
python code to initialize it whenever airflow starts.
Where is the best place to put such code ?
Is the class DagBag's __init__() a good candidate ?
Please advise.
Thanks.
#############################################
class DagBag(LoggingMixin):
"""
A dagbag is a collection of dags, parsed out of a folder tree and has high
level configuration settings, like what database to use as a backend and
what executor to use to fire off tasks. This makes it easier to run
distinct environments for say production and development, tests, or for
different teams or security profiles. What would have been system level
settings are now dagbag level so that one system can run multiple,
independent settings sets.
:param dag_folder: the folder to scan to find DAGs
:type dag_folder: str
:param executor: the executor to use when executing task instances
in this DagBag
:param include_examples: whether to include the examples that ship
with airflow or not
:type include_examples: bool
:param sync_to_db: whether to sync the properties of the DAGs to
the metadata DB while finding them, typically should be done
by the scheduler job only
:type sync_to_db: bool
"""
def __init__(
self,
dag_folder=None,
executor=DEFAULT_EXECUTOR,
include_examples=configuration.getboolean('core', 'LOAD_EXAMPLES'),
sync_to_db=False):
dag_folder = dag_folder or DAGS_FOLDER
self.logger.info("Filling up the DagBag from {}".format(dag_folder))
self.dag_folder = dag_folder
self.dags = {}
self.sync_to_db = sync_to_db
self.file_last_changed = {}
self.executor = executor
self.import_errors = {}
if include_examples:
example_dag_folder = os.path.join(
...
#############################