[
https://issues.apache.org/jira/browse/AIRFLOW-392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Daniel Imberman closed AIRFLOW-392.
-----------------------------------
Resolution: Auto Closed
> DAG runs on strange schedule in the past when deployed
> ------------------------------------------------------
>
> Key: AIRFLOW-392
> URL: https://issues.apache.org/jira/browse/AIRFLOW-392
> Project: Apache Airflow
> Issue Type: Bug
> Components: scheduler
> Affects Versions: 1.7.1.3
> Environment: AWS ElasticBeanstalk as a Docker application running in
> an Ubuntu-based container
> Reporter: David Klosowski
> Assignee: Norman Mu
> Priority: Major
>
> Just deployed a new DAG ('weekly-no-track') that depends on 7 DAG task runs
> of another DAG ('daily-no-track'). When the DAG is deployed the scheduler
> schedules and runs multiple runs in the past (yesterday it ran for 6/12/2016
> and 6/05/2016), despite the start date set to the deployment date.
> It would be a bit difficult to include all the code being used in the DAG
> since we have multiple libraries we've built in Python that are being
> referenced here that we want to eventually open source. I've included some
> of the code here. Let me know if this is all clear and what I can do to help
> or if any insight can be provided as to what it is occurring and how we might
> fix this.
> {code}
> from __future__ import division, print_function
> from airflow.models import DAG
> from airflow.operators import DummyOperator, ExternalTaskSensor,
> TimeDeltaSensor
> from tn_etl_tools.aws.emr import EmrConfig, HiveConfig, read_cluster_templates
> from tn_etl_tools.aws.emr import EmrService, EmrServiceWrapper,
> HiveStepBuilder
> from tn_etl_tools.datesupport import ts_add
> from tn_etl_tools.hive import HivePartitions
> from tn_etl_tools.yaml import YamlLoader
> from datetime import datetime, timedelta
> from dateutil.relativedelta import relativedelta, SU, MO , TU, WE, TH, FR, SA
> from common_args import merge_dicts, CommonHiveParams
> from operator_builders import add_tasks, emr_hive_operator
> import os
> # === configs
> config_dir = os.getenv('DAG_CONFIG_DIR', '/usr/local/airflow/config')
> alert_email = os.getenv('AIRFLOW_TO_EMAIL')
> app_properties = YamlLoader.load_yaml(config_dir + '/app.yml')
> emr_cluster_properties = YamlLoader.load_yaml(config_dir +
> '/emr_clusters.yml')
> emr_config = EmrConfig.load(STAGE=app_properties['STAGE'],
> **app_properties['EMR'])
> hive_config = HiveConfig.load(STAGE=app_properties['STAGE'],
> **app_properties['HIVE'])
> emr_cluster_templates = read_cluster_templates(emr_cluster_properties)
> # === /configs
> # TODO: force execution_date = sunday?
> run_for_date = datetime(2016, 8, 8)
> emr_service = EmrService()
> emr_service_wrapper = EmrServiceWrapper(emr_service=emr_service,
> emr_config=emr_config,
> cluster_templates=emr_cluster_templates)
> hive_step_builder = HiveStepBuilder(hive_config=hive_config)
> hive_params = CommonHiveParams(app_properties_hive=app_properties['HIVE'])
> args = {'owner': 'airflow',
> 'depends_on_past': False,
> 'start_date': run_for_date,
> 'email': [alert_email],
> 'email_on_failure': True,
> 'email_on_retry': False,
> 'retries': 1,
> 'trigger_rule' : 'all_success',
> 'emr_service_wrapper': emr_service_wrapper,
> 'hive_step_builder': hive_step_builder}
> user_defined_macros = {'hive_partitions': HivePartitions,
> 'ts_add': ts_add}
> params = {'stage': app_properties['STAGE']}
> dag = DAG(dag_id='weekly_no_track', default_args=args,
> user_defined_macros=user_defined_macros, params=params,
> schedule_interval=timedelta(days=7),
> max_active_runs=1)
> # === task definitions
> task_definitions = {
> 'wait-for-dailies': {
> 'operator_type': 'dummy_operator', # hub for custom defined
> dependencies
> 'operator_args': {},
> 'depends_on': []
> },
> 'weekly-no-track': {
> 'operator_type': 'emr_hive_operator',
> 'operator_args': {
> 'hive_step': {
> 'script': 'weekly-no-track-airflow', # temporary modified
> script with separate output path
> 'cluster_name': 'geoprofile',
> 'script_vars': merge_dicts(hive_params.default_params(),
> hive_params.rundate_params(), {
> 'PARTITIONS': '{{hive_partitions.by_day(ts_add(ts,
> days=-6), ts_add(ts, days=1))}}',
> }),
> }
> },
> 'depends_on': ['wait-for-dailies']
> }
> }
> # === /task definitions
> operator_builders = {'emr_hive_operator': emr_hive_operator,
> 'time_delta_sensor': TimeDeltaSensor,
> 'dummy_operator': DummyOperator}
> add_tasks(task_definitions, dag=dag, operator_builders=operator_builders)
> # === custom tasks
> downstream_task = dag.get_task('wait-for-dailies')
> for weekday in [MO, TU, WE, TH, FR, SA, SU]:
> task_id = 'wait-for-daily-{day}'.format(day=weekday)
> # weekday(-1) subtracts 1 relative week from the given weekday, however
> if the calculated date is already Monday,
> # for example, -1 won't change the day.
> delta = relativedelta(weekday=weekday(-1))
> sensor = ExternalTaskSensor(task_id=task_id, dag=dag,
> external_dag_id='daily_no_track',
> external_task_id='daily-no-track',
> execution_delta=delta, timeout=86400) #
> 86400 = 24 hours
> sensor.set_downstream(downstream_task)
> # === /custom tasks
> {code}
> Some referenced code
> {{common_args.py}}
> {code}
> from __future__ import division, print_function
> from copy import copy
> class CommonHiveParams(object):
> def __init__(self, app_properties_hive):
> super(CommonHiveParams, self).__init__()
> # TODO: this should be part of a config object
> self.app_properties_hive = app_properties_hive
> def default_params(self):
> return {
> 'HIVE_LIBS_BUCKET':
> self.app_properties_hive['S3_HIVE_LIB_BUCKET'],
> 'STAGE': '{{params.stage}}',
> }
> @staticmethod
> def rundate_params():
> return {
> 'YEAR': '{{execution_date.strftime("%Y")}}',
> 'MONTH': '{{execution_date.strftime("%m")}}',
> 'DAY': '{{execution_date.strftime("%d")}}',
> 'HOUR': '{{execution_date.strftime("%H")}}',
> 'MINUTE': '{{execution_date.strftime("%M")}}',
> }
> def merge_dicts(*dicts):
> """ Merge provided dicts without modification.
> Duplicate keys are overwritten with values from the rightmost applicable
> dict.
> """
> if len(dicts) == 0:
> return {}
> result = copy(dicts[0])
> for d in dicts[1:]:
> result.update(d)
> return result
> {code}
> {{operator_builders.py}}
> {code}
> """Functions for building operators from dict property definitions."""
> from __future__ import division, print_function
> from tn_airflow_components.operators.emr import EmrHiveOperator,
> create_emr_operator_with_step_sensor
> # TODO: this should not be a single package. Not every DAG needs EMR as a
> dependency, for example.
> def emr_hive_operator(task_id, dag, hive_step, **kwargs):
> return create_emr_operator_with_step_sensor(task_id=task_id, dag=dag,
>
> main_operator_class=EmrHiveOperator, main_operator_kwargs=hive_step,
> **kwargs)
> def add_tasks(task_definitions, dag, operator_builders):
> """Add tasks from dict definitions
> :param task_definitions: dict of task definitions. Keys in the top-level
> dict are used as the task IDs
> :type task_definitions: dict
> :param dag: the DAG in which to define the tasks
> :type dag: airflow.models.DAG
> :param operator_builders: mapping of str 'operator_type' values to
> operator builder functions
> :type operator_builders: dict
> """
> for task_id in task_definitions.keys():
> task_definition = task_definitions[task_id]
> operator_type = task_definition['operator_type']
> operator = operator_builders[operator_type](task_id=task_id, dag=dag,
> **task_definition['operator_args'])
> if task_definition['depends_on']:
> for dependency in task_definition['depends_on']:
> operator.set_upstream(dag.get_task(dependency))
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)