Another way to do this is for T3 to do an S3 directory listing and find the latest directories that need processing. When it finishes processing a directory from t1, it deletes it. t1 should add a final file to a directory that means "I'm finished, take this directory".
This is a very fail-safe approach. It changes the model from a pipeline: t1[time 1] supplies data to -> t3[time 1] to: t1[time 1], t1[time 2], ... t1[time n] all supply data to -> t3[time n] On Mon, Jul 11, 2016 at 5:43 PM, MSR M <[email protected]> wrote: > Hi, > > I need some advice in solving a problem with local variables in DAG. > > I have a DAG < schedule intervel 30 mins >. It has 3 tasks. t1 runs a > python program on remote EC2. t2 waits for S3 file availability at > particular location. This S3 file created by t1. Once the S3 file is > available, t3 runs and process the file on S3. > > I have date-time as part of my S3 file location. > > dttm2 = datetime.now().strftime('%Y-%m-%d-%H-%M') > > bucket_key2 = "s3://aaaaa/bbbbb/" + dttm2 + "/sucess" > > t1 runs more than 1 hour so second instance of DAG is already started and > it changes the variable dttm2 value so job1 task # t2 is trying to locate > the file at different location. > > To overcome this I am planning to use parameter {{execution_date}} instead > of getting dttm2 value as shown above. > > In situations like these, is there any better approach to keep same value > in a variable through out the particular run of DAG? > > Or use XCOM feature to push and pull the values across the tasks with > different keys for each run? > > > part of my dag is given below: > > # > > dttm2 = datetime.now().strftime('%Y-%m-%d-%H-%M') > > > NL = """ > cd /home/ubuntu/Scripts/ ; python2 a11.py {{params.get("dttm2")}} > ; > """ > > t1 = BashOperator( > task_id='E_Ns_A', > bash_command=NL, > params={'dttm2':dttm2}, > retries=3, > dag=dag) > > bucket_key2 = "s3://aaaaa/bbbbb/" + dttm2 + "/sucess" > > def detect_s3(name, dag=dag, upstream=t1): > task = S3KeySensor( > task_id = name, > bucket_key=bucket_key2, > s3_conn_id='s3conn', > dag=dag, > wildcard_match=True) > task.set_upstream(upstream) > return task > > # Spark Module to clasiify data > > bucket_key3 = "s3://aaaaa/bbbbb/" + dttm2 + "/" > sparkcmd = """ > cd /home/ubuntu/SC; /home/ubuntu/anaconda3/bin/python > NbRunner.py; > aws s3 cp /home/ubuntu/NC.txt {{params.get("bkey")}} --region > us-west-1 > """ > > t3 = BashOperator( > task_id='CNs', > bash_command=sparkcmd, > params={"bkey":bucket_key3}, > retries=1, > dag=dag) > > t2 = detect_s3('t2') > > t3.set_upstream(t2) > > > Thanks, > MSR > -- Lance Norskog [email protected] Redwood City, CA
