abhishekshenoy commented on pull request #19592:
URL: https://github.com/apache/airflow/pull/19592#issuecomment-1020839406


   @potiuk Upgrading will be an issue as it involves a long wait time. Have 
come up with the below 2 approaches as a fix for the time being. 
   
   -  Using Xcom
   
   ```
       def getRecvdDate(**kwargs):
           next_exec_date = kwargs.get('templates_dict').get('next_exec_date')
           days_delta = kwargs.get('templates_dict').get('time_delta')
           dt=datetime.strptime(next_exec_date, '%Y-%m-%d') + 
timedelta(days=float(days_delta))
           kwargs['ti'].xcom_push(key='recvd_date', 
value=dt.strftime("%Y-%m-%d"))
           return
   
   
       generate_next_exec_date_minus_one = 
PythonOperator(task_id='generate_next_exec_date_minus_one',
                           provide_context=True,
                           python_callable=getRecvdDate,
                           templates_dict={'next_exec_date': '{{ next_ds }}',
                                           'time_delta': '{{ 
var.json.macros_test.time_delta }}'},
                           dag=dag)
   
   
       xcom_based_recvd_date = GCSUploadSessionCompleteSensor(
           task_id='xcom_based_recvd_date',
           bucket=BUCKET_NAME,
           prefix="macros-test/recvd_dt={{ 
task_instance.xcom_pull(task_ids='generate_next_exec_date_minus_one', 
key='recvd_date') }}/",
           impersonation_chain=IMPERSONATE_SERVICE_ACCOUNT,
           timeout=1,
           soft_fail=True,
           dag=dag
       )
   ```
   
   - Using user-defined-macro
   ```
   import croniter
    
   sched = '0 6 * * sat'    
   
   Define below under dag_args :
   'user_defined_macros': {
       'custom_next_exec_date': next_exec_date,
   }
    
   def next_exec_date(dt,days_delta):
       cron = croniter.croniter(sched, dt)
       return cron.get_next(datetime) + timedelta(days=days_delta) 
    
   SOURCE_OBJECT = "{{ var.json.macros-test.sourcedir }}/recvd_dt={{ ( 
custom_next_exec_date(ds,-1) ) }}/"
    
   # Task 1: Checking the directory for file availability
   file_check = GCSObjectsWtihPrefixExistenceSensor(
       task_id="check_current_date_directory_existence",
       bucket=INBOX_BUCKET,
       prefix=SOURCE_OBJECT,
       impersonation_chain=GCS_IMPERSONATE_SERVICE_ACCOUNT,
       timeout=5,
       soft_fail=False,
       dag=dag
   )
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to