[ 
https://issues.apache.org/jira/browse/AIRFLOW-5927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-5927:
---------------------------
    Description: 
I have a `config.py`  pull configure from `Variable` and merge into default 
config :

 
{code:java}
from datetime import datetime
from airflow.models import Variable
class Config:
     version = "V21"
    etl_feature_dir = f'/data/dm/sales_forecast/features/version={version}'
     forecast_result_dir = 
f'/data/dm/sales_forecast/results/fbprophet/version={version}'
     forecast_model_dir = 
f'/data/dm/sales_forecast/models/fbprophet/version={version}'
     forecast_result_s3_dir = 
f's3a://xxxxx/data/dm/sales_forecast/fbprophet/version={version}'
     
     etl_dir = '/data/dm/sales_forecast/etl'
     feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT'
    dag_start_date = datetime(2019, 10, 25)
    etl_start_time = "2019-06-01 00:00:00"
     etl_end_time = " (execution_date + 
macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') "
    train_start_time = " (execution_date  
macros.timedelta(days=90)).strftime('%Y%m-%d 00:00:00') "
     train_end_time = " execution_date.strftime('%Y-%m-%d 00:00:00') "
    predict_start_time = " execution_date.strftime('%Y-%m-%d 00:00:00') "
     predict_end_time = " (execution_date + 
macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') "
    report_start_date = " (execution_date  
macros.timedelta(days=6)).strftime('%Y%m-%d 00:00:00') "
     report_end_date = " execution_date.strftime('%Y-%m-%d 00:00:00') "
    sf_schedule_report = "30 8  *  "
    sf_schedule_etl = '30 1  *  '
    sf_schedule_main_flow = "45 2  *  "
CONFIG_KEY = 'sf_config_%s' % Config.version
sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})
if sf_config:
     for k, v in sf_config.items():
         print(f'Overwrite {k} by {v}')
         if hasattr(Config, k):
             if k == 'dag_start_date':
                 setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
             elif v == 'None':
                 setattr(Config, k, None)
             else:
                 setattr(Config, k, v)
 
{code}
 

 

 

And I have 5 dag file import this Config . they have some similar code like 
  
{code:java}
 
from datetime import datetime, timedelta
from airflow import DAG
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.dagrun_operator import TriggerDagRunOperator
 from airflow.models import Variable
from sf_dags_n.config import Config
default_args = {    'owner': 'mithril',    'depends_on_past': False,    
'email': ['mithril'],    'email_on_failure': False,    'email_on_retry': False, 
   'retries': 2,    'schedule_interval': schedule_interval,}
 
{code}
{code:java}
dag = DAG('dm_sfx_etl_%s' % Config.version, 
     start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
     default_args=default_args, 
     schedule_interval='20 1  *  ',
     user_defined_filters= {         'mod' : lambda s, d:s%d     }
,
 )



{code}
 
  
 The stange thing is : 
  
 Change `sf_schedule_etl` in Variable  took effect several times, but at some 
point , I couldn't change it from variable any more, even I directly hard code  
it : 
  
 ```
 dag = DAG('dm_sfx_etl_%s' % Config.version, 
     start_date=Config.dag_start_date ,
     default_args=default_args, 
     schedule_interval='20 1    **  *  ',
     user_defined_filters= \{         'mod' : lambda s, d:s%d     }

,
 )
 ```
  
 If such situation came, even delete dag file and delete from airflow webui 
,didn't change `schedule_interval` . 
  
 PS: my dag file have running for some days, in these days ,I may add some 
operator to it , or change some operator type, but it still fine .   I think 
there must be some cache in airflow lead to this  problem.
  
  
  
  
  

  was:
I have a `config.py`  pull configure from `Variable` and merge into default 
config :

 
{code:java}
from datetime import datetime
from airflow.models import Variable
class Config:
     version = "V21"
    etl_feature_dir = f'/data/dm/sales_forecast/features/version={version}'
     forecast_result_dir = 
f'/data/dm/sales_forecast/results/fbprophet/version={version}'
     forecast_model_dir = 
f'/data/dm/sales_forecast/models/fbprophet/version={version}'
     forecast_result_s3_dir = 
f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version={version}'
     
     etl_dir = '/data/dm/sales_forecast/etl'
     feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT'
    dag_start_date = datetime(2019, 10, 25)
    etl_start_time = "2019-06-01 00:00:00"
     etl_end_time = " (execution_date + 
macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') "
    train_start_time = " (execution_date  
macros.timedelta(days=90)).strftime('%Y%m-%d 00:00:00') "
     train_end_time = " execution_date.strftime('%Y-%m-%d 00:00:00') "
    predict_start_time = " execution_date.strftime('%Y-%m-%d 00:00:00') "
     predict_end_time = " (execution_date + 
macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') "
    report_start_date = " (execution_date  
macros.timedelta(days=6)).strftime('%Y%m-%d 00:00:00') "
     report_end_date = " execution_date.strftime('%Y-%m-%d 00:00:00') "
    sf_schedule_report = "30 8  *  "
    sf_schedule_etl = '30 1  *  '
    sf_schedule_main_flow = "45 2  *  "
CONFIG_KEY = 'sf_config_%s' % Config.version
sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})
if sf_config:
     for k, v in sf_config.items():
         print(f'Overwrite {k} by {v}')
         if hasattr(Config, k):
             if k == 'dag_start_date':
                 setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
             elif v == 'None':
                 setattr(Config, k, None)
             else:
                 setattr(Config, k, v)
 
{code}
 

 

 


 And I have 5 dag file import this Config . they have some similar code like 
  
{code:java}
 
from datetime import datetime, timedelta
from airflow import DAG
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.dagrun_operator import TriggerDagRunOperator
 from airflow.models import Variable
from sf_dags_n.config import Config
default_args = {    'owner': 'mithril',    'depends_on_past': False,    
'email': ['mithril'],    'email_on_failure': False,    'email_on_retry': False, 
   'retries': 2,    'schedule_interval': schedule_interval,}
 
{code}
{code:java}
dag = DAG('dm_sfx_etl_%s' % Config.version, 
     start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
     default_args=default_args, 
     schedule_interval='20 1  *  ',
     user_defined_filters= {         'mod' : lambda s, d:s%d     }
,
 )



{code}
 
  
 The stange thing is : 
  
 Change `sf_schedule_etl` in Variable  took effect several times, but at some 
point , I couldn't change it from variable any more, even I directly hard code  
it : 
  
 ```
 dag = DAG('dm_sfx_etl_%s' % Config.version, 
     start_date=Config.dag_start_date ,
     default_args=default_args, 
     schedule_interval='20 1  *  **  ',
     user_defined_filters= \{         'mod' : lambda s, d:s%d     }

,
 )
 ```
  
 If such situation came, even delete dag file and delete from airflow webui 
,didn't change `schedule_interval` . 
  
 PS: my dag file have running for some days, in these days ,I may add some 
operator to it , or change some operator type, but it still fine .   I think 
there must be some cache in airflow lead to this  problem.
  
  
  
  
  


> Airflow cache import file or variable
> -------------------------------------
>
>                 Key: AIRFLOW-5927
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-5927
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DAG, database
>    Affects Versions: 1.10.3
>            Reporter: kasim
>            Priority: Major
>
> I have a `config.py`  pull configure from `Variable` and merge into default 
> config :
>  
> {code:java}
> from datetime import datetime
> from airflow.models import Variable
> class Config:
>      version = "V21"
>     etl_feature_dir = f'/data/dm/sales_forecast/features/version={version}'
>      forecast_result_dir = 
> f'/data/dm/sales_forecast/results/fbprophet/version={version}'
>      forecast_model_dir = 
> f'/data/dm/sales_forecast/models/fbprophet/version={version}'
>      forecast_result_s3_dir = 
> f's3a://xxxxx/data/dm/sales_forecast/fbprophet/version={version}'
>      
>      etl_dir = '/data/dm/sales_forecast/etl'
>      feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT'
>     dag_start_date = datetime(2019, 10, 25)
>     etl_start_time = "2019-06-01 00:00:00"
>      etl_end_time = " (execution_date + 
> macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') "
>     train_start_time = " (execution_date  
> macros.timedelta(days=90)).strftime('%Y%m-%d 00:00:00') "
>      train_end_time = " execution_date.strftime('%Y-%m-%d 00:00:00') "
>     predict_start_time = " execution_date.strftime('%Y-%m-%d 00:00:00') "
>      predict_end_time = " (execution_date + 
> macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') "
>     report_start_date = " (execution_date  
> macros.timedelta(days=6)).strftime('%Y%m-%d 00:00:00') "
>      report_end_date = " execution_date.strftime('%Y-%m-%d 00:00:00') "
>     sf_schedule_report = "30 8  *  "
>     sf_schedule_etl = '30 1  *  '
>     sf_schedule_main_flow = "45 2  *  "
> CONFIG_KEY = 'sf_config_%s' % Config.version
> sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})
> if sf_config:
>      for k, v in sf_config.items():
>          print(f'Overwrite {k} by {v}')
>          if hasattr(Config, k):
>              if k == 'dag_start_date':
>                  setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
>              elif v == 'None':
>                  setattr(Config, k, None)
>              else:
>                  setattr(Config, k, v)
>  
> {code}
>  
>  
>  
> And I have 5 dag file import this Config . they have some similar code like 
>   
> {code:java}
>  
> from datetime import datetime, timedelta
> from airflow import DAG
>  from airflow.operators.dummy_operator import DummyOperator
>  from airflow.operators.bash_operator import BashOperator
>  from airflow.operators.dagrun_operator import TriggerDagRunOperator
>  from airflow.models import Variable
> from sf_dags_n.config import Config
> default_args = {    'owner': 'mithril',    'depends_on_past': False,    
> 'email': ['mithril'],    'email_on_failure': False,    'email_on_retry': 
> False,    'retries': 2,    'schedule_interval': schedule_interval,}
>  
> {code}
> {code:java}
> dag = DAG('dm_sfx_etl_%s' % Config.version, 
>      start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
>      default_args=default_args, 
>      schedule_interval='20 1  *  ',
>      user_defined_filters= {         'mod' : lambda s, d:s%d     }
> ,
>  )
> {code}
>  
>   
>  The stange thing is : 
>   
>  Change `sf_schedule_etl` in Variable  took effect several times, but at some 
> point , I couldn't change it from variable any more, even I directly hard 
> code  it : 
>   
>  ```
>  dag = DAG('dm_sfx_etl_%s' % Config.version, 
>      start_date=Config.dag_start_date ,
>      default_args=default_args, 
>      schedule_interval='20 1    **  *  ',
>      user_defined_filters= \{         'mod' : lambda s, d:s%d     }
> ,
>  )
>  ```
>   
>  If such situation came, even delete dag file and delete from airflow webui 
> ,didn't change `schedule_interval` . 
>   
>  PS: my dag file have running for some days, in these days ,I may add some 
> operator to it , or change some operator type, but it still fine .   I think 
> there must be some cache in airflow lead to this  problem.
>   
>   
>   
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to