A DAG Run is a run for a particular dag at a particular time. For example, if your DAG were called FOO and if its schedule interval were @hourly, then you would have DAG runs : Foo @ noon, Foo @ 1p, Foo @ 2p, etc...
If you need to seed a table prior to each run, just add a task at the start of your DAG to load the target table. You can do this simply by using a PythonOperator, specifying your own custom Python callable -- your callable would insert the necessary data. I would warn that it is possible for Airflow to schedule DAG runs for FOO in parallel, unless you specify, depends_on_past=True. You should probably structure your table to be keyed by the dag run so that you could run multiple DAG runs concurrently (i.e. depends_on_past=False). -s On Tue, Nov 1, 2016 at 7:09 AM, Michael Gong <[email protected]> wrote: > Hi, > > I have a MySQL table, which will be stored some static information. The > information could be different for different airflow runs, so I hope to use > python code to initialize it whenever airflow starts. > > > Where is the best place to put such code ? > > > Is the class DagBag's __init__() a good candidate ? > > > Please advise. > > > Thanks. > > > ############################################# > > class DagBag(LoggingMixin): > """ > A dagbag is a collection of dags, parsed out of a folder tree and has > high > level configuration settings, like what database to use as a backend > and > what executor to use to fire off tasks. This makes it easier to run > distinct environments for say production and development, tests, or for > different teams or security profiles. What would have been system level > settings are now dagbag level so that one system can run multiple, > independent settings sets. > > :param dag_folder: the folder to scan to find DAGs > :type dag_folder: str > :param executor: the executor to use when executing task instances > in this DagBag > :param include_examples: whether to include the examples that ship > with airflow or not > :type include_examples: bool > :param sync_to_db: whether to sync the properties of the DAGs to > the metadata DB while finding them, typically should be done > by the scheduler job only > :type sync_to_db: bool > """ > def __init__( > self, > dag_folder=None, > executor=DEFAULT_EXECUTOR, > include_examples=configuration.getboolean('core', > 'LOAD_EXAMPLES'), > sync_to_db=False): > > dag_folder = dag_folder or DAGS_FOLDER > self.logger.info("Filling up the DagBag from > {}".format(dag_folder)) > self.dag_folder = dag_folder > self.dags = {} > self.sync_to_db = sync_to_db > self.file_last_changed = {} > self.executor = executor > self.import_errors = {} > if include_examples: > example_dag_folder = os.path.join( > > ... > > ############################# > >
