[ https://issues.apache.org/jira/browse/AIRFLOW-5927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
kasim updated AIRFLOW-5927: --------------------------- Description: I have a `config.py` pull configure from `Variable` and merge into default config : {code:java} from datetime import datetime from airflow.models import Variable class Config: version = "V21" etl_feature_dir = f'/data/dm/sales_forecast/features/version={version}' forecast_result_dir = f'/data/dm/sales_forecast/results/fbprophet/version={version}' forecast_model_dir = f'/data/dm/sales_forecast/models/fbprophet/version={version}' forecast_result_s3_dir = f's3a://xxxxx/data/dm/sales_forecast/fbprophet/version={version}' etl_dir = '/data/dm/sales_forecast/etl' feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT' dag_start_date = datetime(2019, 10, 25) etl_start_time = "2019-06-01 00:00:00" etl_end_time = " (execution_date + macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') " train_start_time = " (execution_date macros.timedelta(days=90)).strftime('%Y%m-%d 00:00:00') " train_end_time = " execution_date.strftime('%Y-%m-%d 00:00:00') " predict_start_time = " execution_date.strftime('%Y-%m-%d 00:00:00') " predict_end_time = " (execution_date + macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') " report_start_date = " (execution_date macros.timedelta(days=6)).strftime('%Y%m-%d 00:00:00') " report_end_date = " execution_date.strftime('%Y-%m-%d 00:00:00') " sf_schedule_report = "30 8 * " sf_schedule_etl = '30 1 * ' sf_schedule_main_flow = "45 2 * " CONFIG_KEY = 'sf_config_%s' % Config.version sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={}) if sf_config: for k, v in sf_config.items(): print(f'Overwrite {k} by {v}') if hasattr(Config, k): if k == 'dag_start_date': setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') ) elif v == 'None': setattr(Config, k, None) else: setattr(Config, k, v) {code} And I have 5 dag file import this Config . they have some similar code like {code:java} from datetime import datetime, timedelta from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash_operator import BashOperator from airflow.operators.dagrun_operator import TriggerDagRunOperator from airflow.models import Variable from sf_dags_n.config import Config default_args = { 'owner': 'mithril', 'depends_on_past': False, 'email': ['mithril'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 2, 'schedule_interval': schedule_interval,} {code} {code:java} dag = DAG('dm_sfx_etl_%s' % Config.version, start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') , default_args=default_args, schedule_interval='20 1 * ', user_defined_filters= { 'mod' : lambda s, d:s%d } , ) {code} The stange thing is : Change `sf_schedule_etl` in Variable took effect several times, but at some point , I couldn't change it from variable any more, even I directly hard code it : ``` dag = DAG('dm_sfx_etl_%s' % Config.version, start_date=Config.dag_start_date , default_args=default_args, schedule_interval='20 1 ** * ', user_defined_filters= \{ 'mod' : lambda s, d:s%d } , ) ``` If such situation came, even delete dag file and delete from airflow webui ,didn't change `schedule_interval` . PS: my dag file have running for some days, in these days ,I may add some operator to it , or change some operator type, but it still fine . I think there must be some cache in airflow lead to this problem. was: I have a `config.py` pull configure from `Variable` and merge into default config : {code:java} from datetime import datetime from airflow.models import Variable class Config: version = "V21" etl_feature_dir = f'/data/dm/sales_forecast/features/version={version}' forecast_result_dir = f'/data/dm/sales_forecast/results/fbprophet/version={version}' forecast_model_dir = f'/data/dm/sales_forecast/models/fbprophet/version={version}' forecast_result_s3_dir = f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version={version}' etl_dir = '/data/dm/sales_forecast/etl' feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT' dag_start_date = datetime(2019, 10, 25) etl_start_time = "2019-06-01 00:00:00" etl_end_time = " (execution_date + macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') " train_start_time = " (execution_date macros.timedelta(days=90)).strftime('%Y%m-%d 00:00:00') " train_end_time = " execution_date.strftime('%Y-%m-%d 00:00:00') " predict_start_time = " execution_date.strftime('%Y-%m-%d 00:00:00') " predict_end_time = " (execution_date + macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') " report_start_date = " (execution_date macros.timedelta(days=6)).strftime('%Y%m-%d 00:00:00') " report_end_date = " execution_date.strftime('%Y-%m-%d 00:00:00') " sf_schedule_report = "30 8 * " sf_schedule_etl = '30 1 * ' sf_schedule_main_flow = "45 2 * " CONFIG_KEY = 'sf_config_%s' % Config.version sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={}) if sf_config: for k, v in sf_config.items(): print(f'Overwrite {k} by {v}') if hasattr(Config, k): if k == 'dag_start_date': setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') ) elif v == 'None': setattr(Config, k, None) else: setattr(Config, k, v) {code} And I have 5 dag file import this Config . they have some similar code like {code:java} from datetime import datetime, timedelta from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash_operator import BashOperator from airflow.operators.dagrun_operator import TriggerDagRunOperator from airflow.models import Variable from sf_dags_n.config import Config default_args = { 'owner': 'mithril', 'depends_on_past': False, 'email': ['mithril'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 2, 'schedule_interval': schedule_interval,} {code} {code:java} dag = DAG('dm_sfx_etl_%s' % Config.version, start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') , default_args=default_args, schedule_interval='20 1 * ', user_defined_filters= { 'mod' : lambda s, d:s%d } , ) {code} The stange thing is : Change `sf_schedule_etl` in Variable took effect several times, but at some point , I couldn't change it from variable any more, even I directly hard code it : ``` dag = DAG('dm_sfx_etl_%s' % Config.version, start_date=Config.dag_start_date , default_args=default_args, schedule_interval='20 1 * ** ', user_defined_filters= \{ 'mod' : lambda s, d:s%d } , ) ``` If such situation came, even delete dag file and delete from airflow webui ,didn't change `schedule_interval` . PS: my dag file have running for some days, in these days ,I may add some operator to it , or change some operator type, but it still fine . I think there must be some cache in airflow lead to this problem. > Airflow cache import file or variable > ------------------------------------- > > Key: AIRFLOW-5927 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5927 > Project: Apache Airflow > Issue Type: Bug > Components: DAG, database > Affects Versions: 1.10.3 > Reporter: kasim > Priority: Major > > I have a `config.py` pull configure from `Variable` and merge into default > config : > > {code:java} > from datetime import datetime > from airflow.models import Variable > class Config: > version = "V21" > etl_feature_dir = f'/data/dm/sales_forecast/features/version={version}' > forecast_result_dir = > f'/data/dm/sales_forecast/results/fbprophet/version={version}' > forecast_model_dir = > f'/data/dm/sales_forecast/models/fbprophet/version={version}' > forecast_result_s3_dir = > f's3a://xxxxx/data/dm/sales_forecast/fbprophet/version={version}' > > etl_dir = '/data/dm/sales_forecast/etl' > feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT' > dag_start_date = datetime(2019, 10, 25) > etl_start_time = "2019-06-01 00:00:00" > etl_end_time = " (execution_date + > macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') " > train_start_time = " (execution_date > macros.timedelta(days=90)).strftime('%Y%m-%d 00:00:00') " > train_end_time = " execution_date.strftime('%Y-%m-%d 00:00:00') " > predict_start_time = " execution_date.strftime('%Y-%m-%d 00:00:00') " > predict_end_time = " (execution_date + > macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') " > report_start_date = " (execution_date > macros.timedelta(days=6)).strftime('%Y%m-%d 00:00:00') " > report_end_date = " execution_date.strftime('%Y-%m-%d 00:00:00') " > sf_schedule_report = "30 8 * " > sf_schedule_etl = '30 1 * ' > sf_schedule_main_flow = "45 2 * " > CONFIG_KEY = 'sf_config_%s' % Config.version > sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={}) > if sf_config: > for k, v in sf_config.items(): > print(f'Overwrite {k} by {v}') > if hasattr(Config, k): > if k == 'dag_start_date': > setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') ) > elif v == 'None': > setattr(Config, k, None) > else: > setattr(Config, k, v) > > {code} > > > > And I have 5 dag file import this Config . they have some similar code like > > {code:java} > > from datetime import datetime, timedelta > from airflow import DAG > from airflow.operators.dummy_operator import DummyOperator > from airflow.operators.bash_operator import BashOperator > from airflow.operators.dagrun_operator import TriggerDagRunOperator > from airflow.models import Variable > from sf_dags_n.config import Config > default_args = { 'owner': 'mithril', 'depends_on_past': False, > 'email': ['mithril'], 'email_on_failure': False, 'email_on_retry': > False, 'retries': 2, 'schedule_interval': schedule_interval,} > > {code} > {code:java} > dag = DAG('dm_sfx_etl_%s' % Config.version, > start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') , > default_args=default_args, > schedule_interval='20 1 * ', > user_defined_filters= { 'mod' : lambda s, d:s%d } > , > ) > {code} > > > The stange thing is : > > Change `sf_schedule_etl` in Variable took effect several times, but at some > point , I couldn't change it from variable any more, even I directly hard > code it : > > ``` > dag = DAG('dm_sfx_etl_%s' % Config.version, > start_date=Config.dag_start_date , > default_args=default_args, > schedule_interval='20 1 ** * ', > user_defined_filters= \{ 'mod' : lambda s, d:s%d } > , > ) > ``` > > If such situation came, even delete dag file and delete from airflow webui > ,didn't change `schedule_interval` . > > PS: my dag file have running for some days, in these days ,I may add some > operator to it , or change some operator type, but it still fine . I think > there must be some cache in airflow lead to this problem. > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)