GitHub user Srichandra-razor created a discussion: Airflow DAG Scheduling issue
> Hi team,
>
> Apologies — the code I shared earlier was an updated version. I kindly
> request that this issue be reopened for further review.
>
> To clarify, the previous version of the DAG uses the days_ago function from
> airflow.utils.dates to set the start_date. This change is relevant for
> scheduling logic and historical backfill behavior.
>
> Here is a snippet from the older version of the DAG:
>
> ```
> from multiprocessing import Event
> import os
> from datetime import timedelta,date
> import resource
> import time
> import requests
> from uuid import uuid4 as v4
> from itertools import groupby
> from operator import itemgetter
> from airflow import DAG
> from airflow.models import Variable
> from airflow.models.baseoperator import chain
> from airflow.operators.python_operator import PythonOperator
> # from airflow.providers.amazon.aws.operators.redshift_sql import
> RedshiftSQLOperator
> from airflow.providers.postgres.operators.postgres import PostgresOperator
> from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
> from airflow.utils.dates import days_ago
> import boto3
> import pandas as pd
> from boto3.dynamodb.conditions import Key, Attr
> import json
> from datetime import datetime,date
> import time
> import pandas as pd
> import pyarrow.parquet as pq
> import datetime
> import numpy as np
> import json
> import boto3
> from io import BytesIO
> import os
> from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
> from airflow.operators.dummy_operator import DummyOperator
>
>
> import concurrent.futures
>
>
> SLACK_CONN_ID = "SLACK_ALERTS"
> AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID")
> AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY")
>
> # today = datetime.datetime.today().strftime('%Y-%m-%d')
> today = date.today()
>
> s3_bucket = "razor-prod-raw-datalake"
> sqs_client = boto3.client('sqs' , region_name = 'eu-central-1'
> ,aws_access_key_id=AWS_ACCESS_KEY_ID,
> aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
>
> os.environ['AWS_ACCESS_KEY_ID'] = AWS_ACCESS_KEY_ID
> os.environ['AWS_SECRET_ACCESS_KEY'] = AWS_SECRET_ACCESS_KEY
> os.environ['AWS_REGION'] = 'eu-central-1'
>
> resource = boto3.resource(
> "dynamodb",
> region_name="eu-central-1",
> aws_access_key_id=AWS_ACCESS_KEY_ID,
> aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
> )
>
> lambda_client = boto3.client(
> "lambda",
> region_name="eu-central-1",
> aws_access_key_id=AWS_ACCESS_KEY_ID,
> aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
> )
>
> class S3Client:
> def __init__(self):
> self.s3_client = boto3.resource('s3',
> aws_access_key_id=AWS_ACCESS_KEY_ID,
> aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
>
> def put(self, local_file_path, s3_bucket, s3_file_path_and_name):
> return self.s3_client.meta.client.upload_file(f'{local_file_path}',
> s3_bucket, f'{s3_file_path_and_name}')
>
> def get(self,s3_bucket,s3_file_path_and_name):
> s3_object =
> self.s3_client.meta.client.get_object(Bucket=s3_bucket,Key=s3_file_path_and_name)
> return s3_object["Body"].read().decode("utf-8")
>
> def put_object(self, body, s3_bucket, s3_file_path_and_name):
> return
> self.s3_client.meta.client.put_object(Body=body.encode('UTF-8'),Bucket=s3_bucket,Key=s3_file_path_and_name)
>
> def put_object_without_encode(self, body, s3_bucket,
> s3_file_path_and_name):
> return
> self.s3_client.meta.client.put_object(Body=body,Bucket=s3_bucket,Key=s3_file_path_and_name)
>
> def put_object_public(self, body, s3_bucket, s3_file_path_and_name):
> return
> self.s3_client.meta.client.put_object(Body=body.encode('UTF-8'),Bucket=s3_bucket,Key=s3_file_path_and_name,ACL='public-read')
>
> def list_files_in_folder(self, s3_bucket, folder_path):
> try:
> file_list = []
> paginator =
> self.s3_client.meta.client.get_paginator('list_objects_v2')
> page_iterator = paginator.paginate(Bucket=s3_bucket,
> Prefix=folder_path)
>
> for page in page_iterator:
> if 'Contents' in page:
> for obj in page['Contents']:
> file_info = {
> 'key': obj['Key'],
> 'size': obj['Size'] # Size in bytes
> }
> file_list.append(file_info)
>
> return file_list
> except Exception as e:
> print(f"Error listing files in folder: {e}")
> return []
>
> def delete_file(self, s3_bucket, s3_file_path_and_name):
> try:
> response =
> self.s3_client.meta.client.delete_object(Bucket=s3_bucket,
> Key=s3_file_path_and_name)
> return response
> except Exception as e:
> print(f"Error deleting file: {e}")
> return None
>
> def delete_folder(self, s3_bucket, folder_path):
> try:
> # List all files in the folder
> file_list = self.list_files_in_folder(s3_bucket, folder_path)
> if not file_list:
> print(f"No files found in folder: {folder_path}")
> return []
>
> responses = []
> for file_info in file_list:
> response = self.delete_file(s3_bucket, file_info['key'])
> responses.append(response)
>
> return responses
> except Exception as e:
> print(f"Error deleting folder: {e}")
> return None
>
> s3_client = S3Client()
>
> def check_queue_status():
> sqs =
> boto3.client('sqs',region_name='eu-central-1',aws_access_key_id=AWS_ACCESS_KEY_ID,
> aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
>
> queue_url =
> 'https://sqs.eu-central-1.amazonaws.com/815361800176/keywordPresence-prod-recieveCompleteRun'
> max_time_seconds = 3600 * 20 # 5 hours
>
> start_time = time.time()
> while True:
> # Check if the time limit has been reached
> elapsed_time = time.time() - start_time
> if elapsed_time >= max_time_seconds:
> print("Time limit reached. Exiting loop.")
> break
>
> # Get the approximate number of messages in the queue
> response = sqs.get_queue_attributes(
> QueueUrl=queue_url,
> AttributeNames=['ApproximateNumberOfMessages']
> )
>
> # Check if the queue is empty
> num_messages =
> int(response['Attributes']['ApproximateNumberOfMessages'])
> if num_messages == 0:
> print("Queue is empty.")
> break
> else:
> print(f"Queue has {num_messages} messages.")
>
> # Sleep for a while before checking again
> time.sleep(15) # Adjust sleep duration as needed
>
> print("Finished checking queue status.")
>
> print("Entering 15 min sleep for final Lambdas to be computed.")
> time.sleep(900)
>
> def read_s3_parquet(bucket, table_path):
> # Create a ParquetDataset object to access the Parquet files in the
> S3 bucket
> dataset = pq.ParquetDataset(f"s3://{bucket}/{table_path}")
>
> # Read the Parquet files and convert them into a Table object
> table = dataset.read()
>
> # Convert the Table object into a Pandas DataFrame
> df = table.to_pandas()
>
> # Return the DataFrame containing the Parquet data
> return df
>
> def get_random_filename(s3, bucket_name, folder_path):
>
> # List objects in the specified folder
> response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_path)
>
> # Assuming there's only one file in the folder
> if 'Contents' in response:
> return response['Contents'][0]['Key']
>
> return None
>
> def rename_s3_file(s3 , bucket_name, folder_path, new_filename):
>
> # Get the random filename from the specified folder
> old_key = get_random_filename(s3 ,bucket_name, folder_path)
> print("Old Key" , old_key)
> if old_key:
> # Construct the full path for the new file
> new_key = f"{folder_path}/{new_filename}"
> print("New Key" , new_key)
> # Copy the object to the new key
> s3.copy_object(Bucket=bucket_name, CopySource={'Bucket': bucket_name,
> 'Key': old_key}, Key=new_key)
>
> # Delete the old object
> s3.delete_object(Bucket=bucket_name, Key=old_key)
> print(f"File renamed from {old_key} to {new_key}")
> else:
> raise ValueError(f"No file found in folder {folder_path}")
>
> def rename_op():
>
> bucket_name = s3_bucket
> new_filename = 'compiled_op.parquet'
> s3_client = boto3.client("s3",region_name='eu-central-1',
> aws_access_key_id=AWS_ACCESS_KEY_ID , aws_secret_access_key =
> AWS_SECRET_ACCESS_KEY )
> folder_path = f"keyword_presence/{today}/compiled"
>
> print("Folder Path" , folder_path)
> rename_s3_file( s3_client , bucket_name, folder_path, new_filename)
>
>
> def parellel_send_messages(country):
>
> keyword_folder =
> s3_client.list_files_in_folder("razor-prod-raw-datalake",
> f"content_workflows/unloads/keywords_presence_keywords/country_code={country}")
> part = 0
> for file in keyword_folder:
>
> output_list = []
> try:
> country_keywords = read_s3_parquet("razor-prod-raw-datalake",
> f"{file['key']}")
> except Exception as e:
> print(e)
> print(f'Could not find keywords for country : {country} with
> error' + e)
> # raise e
> try:
> country_listing = read_s3_parquet("razor-prod-raw-datalake",
> f"content_workflows/unloads/keywords_presence_listing/country_code={country}/000.parquet")
> except Exception as e:
> print(e)
> print(f'Could not find listing for country : {country} with
> error' + e)
> # raise e
>
>
> merged_df = pd.merge(country_listing, country_keywords, on='asin',
> how='inner')
> print("keyword count")
> print(country)
>
> x = 0
>
> while x<merged_df.shape[0]:
> # print(f"{country} - {x} / {merged_df.shape[0]}")
> to_push_df = merged_df.iloc[x:x+1000]
>
> pandas_dataframe_to_s3(to_push_df,s3_bucket,f'keyword_presence/{today}/inputs',f"{country}_{part}")
>
> message_body = {
> 'bucket': 'razor-prod-raw-datalake',
> 'output_key' :
> f'keyword_presence/{today}/outputs/{country}_{part}',
> 'input_key' :
> f'keyword_presence/{today}/inputs/{country}_{part}.parquet',
> 'country_code' : country,
>
> }
>
> resp = sqs_client.send_message(
>
> QueueUrl='https://sqs.eu-central-1.amazonaws.com/815361800176/keywordPresence-prod-recieveCompleteRun',
> MessageBody=json.dumps(message_body)
> )
>
> message_body['resp'] = resp
> output_list.append(message_body)
>
>
> x+=1000
> part+=1000
> print(part)
>
> return output_list , f"{country} has completed with {x/1000} calls"
>
> country_list_3 = [ 'CA' , 'ES' , 'FR' , 'MX' , 'NL' , 'PL' , 'SE' , 'TR',
> 'DE' , 'IT' , 'UK' , 'US' ]
>
> def main(country):
> output_list , result = parellel_send_messages(country)
> print(result)
>
>
> def pandas_dataframe_to_s3(
> pandas_df,
> s3_bucket,
> s3_path,
> tablename,
> access_key=None,
> secret_key=None
> ):
> """
> Uploads a pandas DataFrame to S3 as a parquet file.
>
> - pandas_df: The DataFrame to upload.
> - s3_bucket: The name of the S3 bucket to upload the file to.
> - s3_path: The path in the S3 bucket where the file will be saved.
> - tablename: The name of the table or identifier for the parquet file.
> - access_key: AWS access key (optional, defaults to class attribute if
> not provided).
> - secret_key: AWS secret key (optional, defaults to class attribute if
> not provided).
> """
> # access_key = access_key
> # secret_key = secret_key
>
> df = pandas_df
>
> s3_client = boto3.client(
> 's3',
> aws_access_key_id=AWS_ACCESS_KEY_ID,
> aws_secret_access_key=AWS_SECRET_ACCESS_KEY
> )
> out_buffer = BytesIO()
> df.to_parquet(out_buffer, index=False)
> filename = f"{s3_path}/{tablename}.parquet"
> s3_client.put_object(Bucket=s3_bucket, Key=filename,
> Body=out_buffer.getvalue())
> # print('pandas dataframe written to s3 as parquet')
>
>
>
> DEFAULT_ARGS = {
> "owner": "tech_analytics",
> "depends_on_past": False,
> "retries": 0,
> "email_on_failure": False,
> "email_on_retry": False,
> "catchup": False,
> "redshift_conn_id": "REDSHIFT_FRANKFURT",
> "postgres_conn_id": "REDSHIFT_FRANKFURT",
> "conn_id": "REDSHIFT_FRANKFURT",
> # "on_failure_callback": task_fail_slack_alert,
> }
>
> with DAG(
> dag_id="rgbit_integrated_kwdb_static",
> description="keyword",
> default_args=DEFAULT_ARGS,
> # dagrun_timeout=timedelta(hours=1),
> start_date=days_ago(1),
> schedule_interval="0 17 * * 7",
> tags=["keyword","static"],
> template_searchpath="/usr/local/airflow/include/redshift_queries",
> catchup=False,
> max_active_tasks=2
> ) as dag:
>
>
>
> country_tasks_3 = []
> for country in country_list_3:
> main_fn = PythonOperator(
> task_id=f'process_{country}_static',
> python_callable=main,
> op_kwargs={'country': country},
> dag=dag,
> retries=2, # Add retries for each task
> retry_delay=timedelta(minutes=5) # Optionally set a different
> retry delay
>
> )
> country_tasks_3.append(main_fn)
> # main_fn = PythonOperator(
> # task_id="main_function",
> # python_callable=main,
> # trigger_rule="all_done",
> # )
>
> wait_for_lambda_1 = PythonOperator(
> task_id="wait_for_lambda_1",
> # python_callable=sleeper,
> python_callable=check_queue_status,
> op_kwargs={},
> trigger_rule="all_done",
> dag=dag
> )
>
> dummy_dag_3 =
> DummyOperator(task_id='dummy_dag_3',trigger_rule="all_done", dag=dag)
>
> dummy_dag_4 =
> DummyOperator(task_id='dummy_dag_4',trigger_rule="all_done", dag=dag)
>
>
> trigger_glue_static = GlueJobOperator(
> task_id="trigger_glue_static",
> job_name="keyword_presence",
> region_name='eu-central-1',
> dag=dag,
> )
>
>
> rename_file_2 = PythonOperator(
> task_id="rename_file_2",
> python_callable=rename_op,
> dag=dag
> )
>
> folder_delete = PythonOperator(
> task_id="folder_delete",
> python_callable=s3_client.delete_folder,
> op_kwargs={'s3_bucket': s3_bucket, 'folder_path':
> f"content_workflows/unloads/keywords_presence_keywords/"},
> )
>
> sql_3 = '''
> unload($$
> select distinct asin, country_code , keyword
> FROM temp_rgbit_keyword_static_list_q2
> ;
>
> $$)
> to
> 's3://razor-prod-raw-datalake/content_workflows/unloads/keywords_presence_keywords/'
> FORMAT PARQUET
> partition by (country_code)
> ALLOWOVERWRITE
> REGION 'eu-central-1'
> MAXFILESIZE AS 50 MB
> PARALLEL ON
> iam_role 'arn:aws:iam::815361800176:role/redshift-unload';
> '''
>
> unload_items_3 = SQLExecuteQueryOperator(
> task_id=f"unload_items_to_s3_3",
> sql=sql_3
> )
>
>
>
> data_push_static='''
> truncate table rgbit_keyword_presence_intermediate_temp_static;
> COPY rgbit_keyword_presence_intermediate_temp_static
> FROM
> 's3://razor-prod-raw-datalake/keyword_presence/{today}/compiled/compiled_op.parquet'
> iam_role 'arn:aws:iam::815361800176:role/redshift-unload'
> FORMAT AS parquet
> '''.format(
> today = today
> )
>
> data_push_static_1 = '''
> truncate table public.rgbit_keyword_presence_intermediate_main_static;
> insert into rgbit_keyword_presence_intermediate_main_static select
> asin, country_code, keyword, max_column, max_value, duplication_score,
> field_score, backend_keywords__presence_sum, bp1__presence_sum,
> bp2__presence_sum, bp3__presence_sum, bp4__presence_sum, bp5__presence_sum,
> description__presence_sum, title__presence_sum, forbidden_new_keyword,
> forbidden_keyword_removed_kw, trademark_new_keyword,
> trademark_keyword_removed_kw
> from (select * ,row_number() over(partition by
> asin,country_code,keyword order by asin,country_code,keyword) as rn from
> rgbit_keyword_presence_intermediate_temp_static)
> where rn = 1
> '''
>
> trunctate_static = SQLExecuteQueryOperator(
> task_id=f"trunctate_static",
> sql=data_push_static
> )
>
> push_data_static = SQLExecuteQueryOperator(
> task_id=f"data_push_static_1",
> sql=data_push_static_1
> )
>
> folder_delete >> unload_items_3 >> dummy_dag_3
>
> for task in country_tasks_3:
> dummy_dag_3 >> task >> wait_for_lambda_1
>
> wait_for_lambda_1 >> trigger_glue_static >> rename_file_2 >>
> trunctate_static >> push_data_static
> ```
_Originally posted by @Srichandra-razor in
[#51651](https://github.com/apache/airflow/issues/51651#issuecomment-2975426622)_
GitHub link: https://github.com/apache/airflow/discussions/60215
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]