GitHub user Srichandra-razor created a discussion: Airflow DAG Scheduling issue

> Hi team,
> 
> Apologies — the code I shared earlier was an updated version. I kindly 
> request that this issue be reopened for further review.
> 
> To clarify, the previous version of the DAG uses the days_ago function from 
> airflow.utils.dates to set the start_date. This change is relevant for 
> scheduling logic and historical backfill behavior.
> 
> Here is a snippet from the older version of the DAG:
> 
> ```
> from multiprocessing import Event
> import os
> from datetime import timedelta,date
> import resource
> import time
> import requests
> from uuid import uuid4 as v4
> from itertools import groupby
> from operator import itemgetter
> from airflow import DAG
> from airflow.models import Variable
> from airflow.models.baseoperator import chain
> from airflow.operators.python_operator import PythonOperator
> # from airflow.providers.amazon.aws.operators.redshift_sql import 
> RedshiftSQLOperator
> from airflow.providers.postgres.operators.postgres import PostgresOperator
> from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
> from airflow.utils.dates import days_ago
> import boto3
> import pandas as pd
> from boto3.dynamodb.conditions import Key, Attr
> import json
> from datetime import datetime,date
> import time
> import pandas as pd
> import pyarrow.parquet as pq
> import datetime
> import numpy as np
> import json
> import boto3
> from io import BytesIO
> import os
> from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
> from airflow.operators.dummy_operator import DummyOperator
> 
> 
> import concurrent.futures
> 
> 
> SLACK_CONN_ID = "SLACK_ALERTS"
> AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID")
> AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY")
> 
> # today = datetime.datetime.today().strftime('%Y-%m-%d')
> today = date.today()
> 
> s3_bucket = "razor-prod-raw-datalake"
> sqs_client = boto3.client('sqs' , region_name = 'eu-central-1' 
> ,aws_access_key_id=AWS_ACCESS_KEY_ID,
>         aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
> 
> os.environ['AWS_ACCESS_KEY_ID'] = AWS_ACCESS_KEY_ID
> os.environ['AWS_SECRET_ACCESS_KEY'] = AWS_SECRET_ACCESS_KEY
> os.environ['AWS_REGION'] = 'eu-central-1'
> 
> resource = boto3.resource(
>     "dynamodb",
>     region_name="eu-central-1",
>     aws_access_key_id=AWS_ACCESS_KEY_ID,
>     aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
> )
> 
> lambda_client = boto3.client(
>     "lambda",
>     region_name="eu-central-1",
>     aws_access_key_id=AWS_ACCESS_KEY_ID,
>     aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
> )
> 
> class S3Client:
>     def __init__(self):
>         self.s3_client = boto3.resource('s3', 
> aws_access_key_id=AWS_ACCESS_KEY_ID, 
> aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
> 
>     def put(self, local_file_path, s3_bucket, s3_file_path_and_name):
>         return self.s3_client.meta.client.upload_file(f'{local_file_path}', 
> s3_bucket, f'{s3_file_path_and_name}')
>     
>     def get(self,s3_bucket,s3_file_path_and_name):
>         s3_object = 
> self.s3_client.meta.client.get_object(Bucket=s3_bucket,Key=s3_file_path_and_name)
>         return s3_object["Body"].read().decode("utf-8")
> 
>     def put_object(self, body, s3_bucket, s3_file_path_and_name):
>         return 
> self.s3_client.meta.client.put_object(Body=body.encode('UTF-8'),Bucket=s3_bucket,Key=s3_file_path_and_name)
>     
>     def put_object_without_encode(self, body, s3_bucket, 
> s3_file_path_and_name):
>         return 
> self.s3_client.meta.client.put_object(Body=body,Bucket=s3_bucket,Key=s3_file_path_and_name)
>     
>     def put_object_public(self, body, s3_bucket, s3_file_path_and_name):
>         return 
> self.s3_client.meta.client.put_object(Body=body.encode('UTF-8'),Bucket=s3_bucket,Key=s3_file_path_and_name,ACL='public-read')
>     
>     def list_files_in_folder(self, s3_bucket, folder_path):
>         try:
>             file_list = []
>             paginator = 
> self.s3_client.meta.client.get_paginator('list_objects_v2')
>             page_iterator = paginator.paginate(Bucket=s3_bucket, 
> Prefix=folder_path)
> 
>             for page in page_iterator:
>                 if 'Contents' in page:
>                     for obj in page['Contents']:
>                         file_info = {
>                             'key': obj['Key'],
>                             'size': obj['Size']  # Size in bytes
>                         }
>                         file_list.append(file_info)
> 
>             return file_list
>         except Exception as e:
>             print(f"Error listing files in folder: {e}")
>             return []
>         
>     def delete_file(self, s3_bucket, s3_file_path_and_name):
>         try:
>             response = 
> self.s3_client.meta.client.delete_object(Bucket=s3_bucket, 
> Key=s3_file_path_and_name)
>             return response
>         except Exception as e:
>             print(f"Error deleting file: {e}")
>             return None
> 
>     def delete_folder(self, s3_bucket, folder_path):
>         try:
>             # List all files in the folder
>             file_list = self.list_files_in_folder(s3_bucket, folder_path)
>             if not file_list:
>                 print(f"No files found in folder: {folder_path}")
>                 return []
> 
>             responses = []
>             for file_info in file_list:
>                 response = self.delete_file(s3_bucket, file_info['key'])
>                 responses.append(response)
> 
>             return responses
>         except Exception as e:
>             print(f"Error deleting folder: {e}")
>             return None
>         
> s3_client = S3Client()
> 
> def check_queue_status():
>     sqs = 
> boto3.client('sqs',region_name='eu-central-1',aws_access_key_id=AWS_ACCESS_KEY_ID,
>             aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
>     
>     queue_url = 
> 'https://sqs.eu-central-1.amazonaws.com/815361800176/keywordPresence-prod-recieveCompleteRun'
>     max_time_seconds = 3600 * 20 # 5 hours
>     
>     start_time = time.time()
>     while True:
>         # Check if the time limit has been reached
>         elapsed_time = time.time() - start_time
>         if elapsed_time >= max_time_seconds:
>             print("Time limit reached. Exiting loop.")
>             break
>         
>         # Get the approximate number of messages in the queue
>         response = sqs.get_queue_attributes(
>             QueueUrl=queue_url,
>             AttributeNames=['ApproximateNumberOfMessages']
>         )
>         
>         # Check if the queue is empty
>         num_messages = 
> int(response['Attributes']['ApproximateNumberOfMessages'])
>         if num_messages == 0:
>             print("Queue is empty.")
>             break
>         else:
>             print(f"Queue has {num_messages} messages.")
>         
>         # Sleep for a while before checking again
>         time.sleep(15)  # Adjust sleep duration as needed
>     
>     print("Finished checking queue status.")
> 
>     print("Entering 15 min sleep for final Lambdas to be computed.")
>     time.sleep(900)
> 
> def read_s3_parquet(bucket, table_path):
>         # Create a ParquetDataset object to access the Parquet files in the 
> S3 bucket
>         dataset = pq.ParquetDataset(f"s3://{bucket}/{table_path}")
> 
>         # Read the Parquet files and convert them into a Table object
>         table = dataset.read()
> 
>         # Convert the Table object into a Pandas DataFrame
>         df = table.to_pandas()
> 
>         # Return the DataFrame containing the Parquet data
>         return df
> 
> def get_random_filename(s3, bucket_name, folder_path):
> 
>     # List objects in the specified folder
>     response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_path)
> 
>     # Assuming there's only one file in the folder
>     if 'Contents' in response:
>         return response['Contents'][0]['Key']
> 
>     return None
> 
> def rename_s3_file(s3 , bucket_name, folder_path, new_filename):
> 
>     # Get the random filename from the specified folder
>     old_key = get_random_filename(s3 ,bucket_name, folder_path)
>     print("Old Key" , old_key)
>     if old_key:
>         # Construct the full path for the new file
>         new_key = f"{folder_path}/{new_filename}"
>         print("New Key" , new_key)
>         # Copy the object to the new key
>         s3.copy_object(Bucket=bucket_name, CopySource={'Bucket': bucket_name, 
> 'Key': old_key}, Key=new_key)
> 
>         # Delete the old object
>         s3.delete_object(Bucket=bucket_name, Key=old_key)
>         print(f"File renamed from {old_key} to {new_key}")
>     else:
>         raise ValueError(f"No file found in folder {folder_path}")
>     
> def rename_op():
> 
>     bucket_name = s3_bucket
>     new_filename = 'compiled_op.parquet'
>     s3_client = boto3.client("s3",region_name='eu-central-1', 
> aws_access_key_id=AWS_ACCESS_KEY_ID , aws_secret_access_key = 
> AWS_SECRET_ACCESS_KEY )
>     folder_path = f"keyword_presence/{today}/compiled"    
> 
>     print("Folder Path" , folder_path)
>     rename_s3_file( s3_client , bucket_name, folder_path, new_filename)
> 
> 
> def parellel_send_messages(country):
> 
>     keyword_folder = 
> s3_client.list_files_in_folder("razor-prod-raw-datalake", 
> f"content_workflows/unloads/keywords_presence_keywords/country_code={country}")
>     part = 0
>     for file in keyword_folder:
>     
>         output_list = []
>         try:
>             country_keywords = read_s3_parquet("razor-prod-raw-datalake", 
> f"{file['key']}")
>         except Exception as e:
>             print(e)
>             print(f'Could not find keywords for country : {country} with 
> error' + e)
>             # raise e
>         try:
>             country_listing = read_s3_parquet("razor-prod-raw-datalake", 
> f"content_workflows/unloads/keywords_presence_listing/country_code={country}/000.parquet")
>         except Exception as e:
>             print(e)
>             print(f'Could not find listing for country : {country} with 
> error' + e)
>             # raise e
>         
> 
>         merged_df = pd.merge(country_listing, country_keywords, on='asin', 
> how='inner')
>         print("keyword count")
>         print(country)
>         
>         x = 0 
> 
>         while x<merged_df.shape[0]:
>     #         print(f"{country} - {x} / {merged_df.shape[0]}")
>             to_push_df = merged_df.iloc[x:x+1000]
>             
> pandas_dataframe_to_s3(to_push_df,s3_bucket,f'keyword_presence/{today}/inputs',f"{country}_{part}")
>             
>             message_body = {
>                 'bucket': 'razor-prod-raw-datalake',
>                 'output_key' : 
> f'keyword_presence/{today}/outputs/{country}_{part}',
>                 'input_key' : 
> f'keyword_presence/{today}/inputs/{country}_{part}.parquet',
>                 'country_code' : country,
>                 
>             }
> 
>             resp = sqs_client.send_message(
>                 
> QueueUrl='https://sqs.eu-central-1.amazonaws.com/815361800176/keywordPresence-prod-recieveCompleteRun',
>                 MessageBody=json.dumps(message_body) 
>             )
>             
>             message_body['resp'] = resp
>             output_list.append(message_body)
>             
> 
>             x+=1000
>             part+=1000
>             print(part)
>             
>     return output_list , f"{country} has completed with {x/1000} calls"
> 
> country_list_3 = [ 'CA' , 'ES' , 'FR' ,  'MX' , 'NL' , 'PL' , 'SE' , 'TR',  
> 'DE' , 'IT' , 'UK' , 'US' ]
> 
> def main(country):
>     output_list , result = parellel_send_messages(country)
>     print(result)
> 
> 
> def pandas_dataframe_to_s3(
>     pandas_df,
>     s3_bucket,
>     s3_path,
>     tablename,
>     access_key=None,
>     secret_key=None
> ):
>     """
>     Uploads a pandas DataFrame to S3 as a parquet file.
> 
>     - pandas_df: The DataFrame to upload.
>     - s3_bucket: The name of the S3 bucket to upload the file to.
>     - s3_path: The path in the S3 bucket where the file will be saved.
>     - tablename: The name of the table or identifier for the parquet file.
>     - access_key: AWS access key (optional, defaults to class attribute if 
> not provided).
>     - secret_key: AWS secret key (optional, defaults to class attribute if 
> not provided).
>     """
>     # access_key = access_key
>     # secret_key = secret_key
> 
>     df = pandas_df
> 
>     s3_client = boto3.client(
>         's3',
>         aws_access_key_id=AWS_ACCESS_KEY_ID,
>         aws_secret_access_key=AWS_SECRET_ACCESS_KEY
>     )
>     out_buffer = BytesIO()
>     df.to_parquet(out_buffer, index=False)
>     filename = f"{s3_path}/{tablename}.parquet"
>     s3_client.put_object(Bucket=s3_bucket, Key=filename, 
> Body=out_buffer.getvalue())
> #     print('pandas dataframe written to s3 as parquet')
> 
> 
> 
> DEFAULT_ARGS = {
>     "owner": "tech_analytics",
>     "depends_on_past": False,
>     "retries": 0,
>     "email_on_failure": False,
>     "email_on_retry": False,
>     "catchup": False,
>     "redshift_conn_id": "REDSHIFT_FRANKFURT",
>     "postgres_conn_id": "REDSHIFT_FRANKFURT",
>     "conn_id": "REDSHIFT_FRANKFURT",
>     # "on_failure_callback": task_fail_slack_alert,
> }
> 
> with DAG(
>     dag_id="rgbit_integrated_kwdb_static",
>     description="keyword",
>     default_args=DEFAULT_ARGS,
>     # dagrun_timeout=timedelta(hours=1),
>     start_date=days_ago(1),
>     schedule_interval="0 17 * * 7",
>     tags=["keyword","static"],
>     template_searchpath="/usr/local/airflow/include/redshift_queries",
>     catchup=False,  
>     max_active_tasks=2
> ) as dag:
> 
> 
>     
>     country_tasks_3 = []
>     for country in country_list_3:
>         main_fn = PythonOperator(
>             task_id=f'process_{country}_static',
>             python_callable=main,
>             op_kwargs={'country': country},
>             dag=dag,
>             retries=2,  # Add retries for each task
>             retry_delay=timedelta(minutes=5)  # Optionally set a different 
> retry delay
>         
>         )
>         country_tasks_3.append(main_fn)
>     # main_fn = PythonOperator(
>     #     task_id="main_function",
>     #     python_callable=main,
>     #     trigger_rule="all_done",
>     # )
> 
>     wait_for_lambda_1 = PythonOperator(
>         task_id="wait_for_lambda_1",
>         # python_callable=sleeper,
>         python_callable=check_queue_status,
>         op_kwargs={},
>         trigger_rule="all_done",
>         dag=dag
>     )
> 
>     dummy_dag_3 = 
> DummyOperator(task_id='dummy_dag_3',trigger_rule="all_done", dag=dag)
> 
>     dummy_dag_4 = 
> DummyOperator(task_id='dummy_dag_4',trigger_rule="all_done", dag=dag)
> 
> 
>     trigger_glue_static = GlueJobOperator(
>         task_id="trigger_glue_static",
>         job_name="keyword_presence",
>         region_name='eu-central-1',
>         dag=dag,
>     )
> 
> 
>     rename_file_2 = PythonOperator(
>         task_id="rename_file_2",
>         python_callable=rename_op,
>         dag=dag
>     )
> 
>     folder_delete  =  PythonOperator(
>         task_id="folder_delete", 
>         python_callable=s3_client.delete_folder,
>         op_kwargs={'s3_bucket': s3_bucket, 'folder_path': 
> f"content_workflows/unloads/keywords_presence_keywords/"},
>     )
> 
>     sql_3 = '''
>         unload($$     
>         select distinct asin, country_code , keyword
>         FROM temp_rgbit_keyword_static_list_q2
>         ;
>         
>         $$)
>         to 
> 's3://razor-prod-raw-datalake/content_workflows/unloads/keywords_presence_keywords/'
>         FORMAT PARQUET
>         partition by (country_code)
>         ALLOWOVERWRITE
>         REGION 'eu-central-1'
>         MAXFILESIZE AS 50 MB
>         PARALLEL ON
>         iam_role 'arn:aws:iam::815361800176:role/redshift-unload';
>         '''
> 
>     unload_items_3 = SQLExecuteQueryOperator(
>         task_id=f"unload_items_to_s3_3",
>         sql=sql_3
>     )
> 
> 
> 
>     data_push_static='''
>             truncate table rgbit_keyword_presence_intermediate_temp_static;
>             COPY rgbit_keyword_presence_intermediate_temp_static
>             FROM 
> 's3://razor-prod-raw-datalake/keyword_presence/{today}/compiled/compiled_op.parquet'
>             iam_role 'arn:aws:iam::815361800176:role/redshift-unload'
>             FORMAT AS parquet
>     '''.format(
>         today = today
>     )
> 
>     data_push_static_1 = '''
>         truncate table public.rgbit_keyword_presence_intermediate_main_static;
>         insert into rgbit_keyword_presence_intermediate_main_static select 
> asin, country_code, keyword, max_column, max_value, duplication_score, 
> field_score, backend_keywords__presence_sum, bp1__presence_sum, 
> bp2__presence_sum, bp3__presence_sum, bp4__presence_sum, bp5__presence_sum, 
> description__presence_sum, title__presence_sum, forbidden_new_keyword, 
> forbidden_keyword_removed_kw, trademark_new_keyword, 
> trademark_keyword_removed_kw
>         from (select * ,row_number() over(partition by 
> asin,country_code,keyword order by asin,country_code,keyword) as rn from 
> rgbit_keyword_presence_intermediate_temp_static)
>         where rn = 1 
>         '''
> 
>     trunctate_static = SQLExecuteQueryOperator(
>         task_id=f"trunctate_static",
>         sql=data_push_static
>     )
> 
>     push_data_static = SQLExecuteQueryOperator(
>         task_id=f"data_push_static_1",
>         sql=data_push_static_1
>     )
> 
>     folder_delete >> unload_items_3 >> dummy_dag_3
> 
>     for task in country_tasks_3:
>         dummy_dag_3 >> task >> wait_for_lambda_1
> 
>     wait_for_lambda_1 >> trigger_glue_static >> rename_file_2 >> 
> trunctate_static >> push_data_static
> ``` 

 _Originally posted by @Srichandra-razor in 
[#51651](https://github.com/apache/airflow/issues/51651#issuecomment-2975426622)_

GitHub link: https://github.com/apache/airflow/discussions/60215

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to