Srichandra-razor opened a new issue, #51771:
URL: https://github.com/apache/airflow/issues/51771
> Hi team,
>
> Apologies — the code I shared earlier was an updated version. I kindly
request that this issue be reopened for further review.
>
> To clarify, the previous version of the DAG uses the days_ago function
from airflow.utils.dates to set the start_date. This change is relevant for
scheduling logic and historical backfill behavior.
>
> Here is a snippet from the older version of the DAG:
>
> ```
> from multiprocessing import Event
> import os
> from datetime import timedelta,date
> import resource
> import time
> import requests
> from uuid import uuid4 as v4
> from itertools import groupby
> from operator import itemgetter
> from airflow import DAG
> from airflow.models import Variable
> from airflow.models.baseoperator import chain
> from airflow.operators.python_operator import PythonOperator
> # from airflow.providers.amazon.aws.operators.redshift_sql import
RedshiftSQLOperator
> from airflow.providers.postgres.operators.postgres import PostgresOperator
> from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
> from airflow.utils.dates import days_ago
> import boto3
> import pandas as pd
> from boto3.dynamodb.conditions import Key, Attr
> import json
> from datetime import datetime,date
> import time
> import pandas as pd
> import pyarrow.parquet as pq
> import datetime
> import numpy as np
> import json
> import boto3
> from io import BytesIO
> import os
> from airflow.providers.common.sql.operators.sql import
SQLExecuteQueryOperator
> from airflow.operators.dummy_operator import DummyOperator
>
>
> import concurrent.futures
>
>
> SLACK_CONN_ID = "SLACK_ALERTS"
> AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID")
> AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY")
>
> # today = datetime.datetime.today().strftime('%Y-%m-%d')
> today = date.today()
>
> s3_bucket = "razor-prod-raw-datalake"
> sqs_client = boto3.client('sqs' , region_name = 'eu-central-1'
,aws_access_key_id=AWS_ACCESS_KEY_ID,
> aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
>
> os.environ['AWS_ACCESS_KEY_ID'] = AWS_ACCESS_KEY_ID
> os.environ['AWS_SECRET_ACCESS_KEY'] = AWS_SECRET_ACCESS_KEY
> os.environ['AWS_REGION'] = 'eu-central-1'
>
> resource = boto3.resource(
> "dynamodb",
> region_name="eu-central-1",
> aws_access_key_id=AWS_ACCESS_KEY_ID,
> aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
> )
>
> lambda_client = boto3.client(
> "lambda",
> region_name="eu-central-1",
> aws_access_key_id=AWS_ACCESS_KEY_ID,
> aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
> )
>
> class S3Client:
> def __init__(self):
> self.s3_client = boto3.resource('s3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
>
> def put(self, local_file_path, s3_bucket, s3_file_path_and_name):
> return
self.s3_client.meta.client.upload_file(f'{local_file_path}', s3_bucket,
f'{s3_file_path_and_name}')
>
> def get(self,s3_bucket,s3_file_path_and_name):
> s3_object =
self.s3_client.meta.client.get_object(Bucket=s3_bucket,Key=s3_file_path_and_name)
> return s3_object["Body"].read().decode("utf-8")
>
> def put_object(self, body, s3_bucket, s3_file_path_and_name):
> return
self.s3_client.meta.client.put_object(Body=body.encode('UTF-8'),Bucket=s3_bucket,Key=s3_file_path_and_name)
>
> def put_object_without_encode(self, body, s3_bucket,
s3_file_path_and_name):
> return
self.s3_client.meta.client.put_object(Body=body,Bucket=s3_bucket,Key=s3_file_path_and_name)
>
> def put_object_public(self, body, s3_bucket, s3_file_path_and_name):
> return
self.s3_client.meta.client.put_object(Body=body.encode('UTF-8'),Bucket=s3_bucket,Key=s3_file_path_and_name,ACL='public-read')
>
> def list_files_in_folder(self, s3_bucket, folder_path):
> try:
> file_list = []
> paginator =
self.s3_client.meta.client.get_paginator('list_objects_v2')
> page_iterator = paginator.paginate(Bucket=s3_bucket,
Prefix=folder_path)
>
> for page in page_iterator:
> if 'Contents' in page:
> for obj in page['Contents']:
> file_info = {
> 'key': obj['Key'],
> 'size': obj['Size'] # Size in bytes
> }
> file_list.append(file_info)
>
> return file_list
> except Exception as e:
> print(f"Error listing files in folder: {e}")
> return []
>
> def delete_file(self, s3_bucket, s3_file_path_and_name):
> try:
> response =
self.s3_client.meta.client.delete_object(Bucket=s3_bucket,
Key=s3_file_path_and_name)
> return response
> except Exception as e:
> print(f"Error deleting file: {e}")
> return None
>
> def delete_folder(self, s3_bucket, folder_path):
> try:
> # List all files in the folder
> file_list = self.list_files_in_folder(s3_bucket, folder_path)
> if not file_list:
> print(f"No files found in folder: {folder_path}")
> return []
>
> responses = []
> for file_info in file_list:
> response = self.delete_file(s3_bucket, file_info['key'])
> responses.append(response)
>
> return responses
> except Exception as e:
> print(f"Error deleting folder: {e}")
> return None
>
> s3_client = S3Client()
>
> def check_queue_status():
> sqs =
boto3.client('sqs',region_name='eu-central-1',aws_access_key_id=AWS_ACCESS_KEY_ID,
> aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
>
> queue_url =
'https://sqs.eu-central-1.amazonaws.com/815361800176/keywordPresence-prod-recieveCompleteRun'
> max_time_seconds = 3600 * 20 # 5 hours
>
> start_time = time.time()
> while True:
> # Check if the time limit has been reached
> elapsed_time = time.time() - start_time
> if elapsed_time >= max_time_seconds:
> print("Time limit reached. Exiting loop.")
> break
>
> # Get the approximate number of messages in the queue
> response = sqs.get_queue_attributes(
> QueueUrl=queue_url,
> AttributeNames=['ApproximateNumberOfMessages']
> )
>
> # Check if the queue is empty
> num_messages =
int(response['Attributes']['ApproximateNumberOfMessages'])
> if num_messages == 0:
> print("Queue is empty.")
> break
> else:
> print(f"Queue has {num_messages} messages.")
>
> # Sleep for a while before checking again
> time.sleep(15) # Adjust sleep duration as needed
>
> print("Finished checking queue status.")
>
> print("Entering 15 min sleep for final Lambdas to be computed.")
> time.sleep(900)
>
> def read_s3_parquet(bucket, table_path):
> # Create a ParquetDataset object to access the Parquet files in
the S3 bucket
> dataset = pq.ParquetDataset(f"s3://{bucket}/{table_path}")
>
> # Read the Parquet files and convert them into a Table object
> table = dataset.read()
>
> # Convert the Table object into a Pandas DataFrame
> df = table.to_pandas()
>
> # Return the DataFrame containing the Parquet data
> return df
>
> def get_random_filename(s3, bucket_name, folder_path):
>
> # List objects in the specified folder
> response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_path)
>
> # Assuming there's only one file in the folder
> if 'Contents' in response:
> return response['Contents'][0]['Key']
>
> return None
>
> def rename_s3_file(s3 , bucket_name, folder_path, new_filename):
>
> # Get the random filename from the specified folder
> old_key = get_random_filename(s3 ,bucket_name, folder_path)
> print("Old Key" , old_key)
> if old_key:
> # Construct the full path for the new file
> new_key = f"{folder_path}/{new_filename}"
> print("New Key" , new_key)
> # Copy the object to the new key
> s3.copy_object(Bucket=bucket_name, CopySource={'Bucket':
bucket_name, 'Key': old_key}, Key=new_key)
>
> # Delete the old object
> s3.delete_object(Bucket=bucket_name, Key=old_key)
> print(f"File renamed from {old_key} to {new_key}")
> else:
> raise ValueError(f"No file found in folder {folder_path}")
>
> def rename_op():
>
> bucket_name = s3_bucket
> new_filename = 'compiled_op.parquet'
> s3_client = boto3.client("s3",region_name='eu-central-1',
aws_access_key_id=AWS_ACCESS_KEY_ID , aws_secret_access_key =
AWS_SECRET_ACCESS_KEY )
> folder_path = f"keyword_presence/{today}/compiled"
>
> print("Folder Path" , folder_path)
> rename_s3_file( s3_client , bucket_name, folder_path, new_filename)
>
>
> def parellel_send_messages(country):
>
> keyword_folder =
s3_client.list_files_in_folder("razor-prod-raw-datalake",
f"content_workflows/unloads/keywords_presence_keywords/country_code={country}")
> part = 0
> for file in keyword_folder:
>
> output_list = []
> try:
> country_keywords = read_s3_parquet("razor-prod-raw-datalake",
f"{file['key']}")
> except Exception as e:
> print(e)
> print(f'Could not find keywords for country : {country} with
error' + e)
> # raise e
> try:
> country_listing = read_s3_parquet("razor-prod-raw-datalake",
f"content_workflows/unloads/keywords_presence_listing/country_code={country}/000.parquet")
> except Exception as e:
> print(e)
> print(f'Could not find listing for country : {country} with
error' + e)
> # raise e
>
>
> merged_df = pd.merge(country_listing, country_keywords, on='asin',
how='inner')
> print("keyword count")
> print(country)
>
> x = 0
>
> while x<merged_df.shape[0]:
> # print(f"{country} - {x} / {merged_df.shape[0]}")
> to_push_df = merged_df.iloc[x:x+1000]
>
pandas_dataframe_to_s3(to_push_df,s3_bucket,f'keyword_presence/{today}/inputs',f"{country}_{part}")
>
> message_body = {
> 'bucket': 'razor-prod-raw-datalake',
> 'output_key' :
f'keyword_presence/{today}/outputs/{country}_{part}',
> 'input_key' :
f'keyword_presence/{today}/inputs/{country}_{part}.parquet',
> 'country_code' : country,
>
> }
>
> resp = sqs_client.send_message(
>
QueueUrl='https://sqs.eu-central-1.amazonaws.com/815361800176/keywordPresence-prod-recieveCompleteRun',
> MessageBody=json.dumps(message_body)
> )
>
> message_body['resp'] = resp
> output_list.append(message_body)
>
>
> x+=1000
> part+=1000
> print(part)
>
> return output_list , f"{country} has completed with {x/1000} calls"
>
> country_list_3 = [ 'CA' , 'ES' , 'FR' , 'MX' , 'NL' , 'PL' , 'SE' , 'TR',
'DE' , 'IT' , 'UK' , 'US' ]
>
> def main(country):
> output_list , result = parellel_send_messages(country)
> print(result)
>
>
> def pandas_dataframe_to_s3(
> pandas_df,
> s3_bucket,
> s3_path,
> tablename,
> access_key=None,
> secret_key=None
> ):
> """
> Uploads a pandas DataFrame to S3 as a parquet file.
>
> - pandas_df: The DataFrame to upload.
> - s3_bucket: The name of the S3 bucket to upload the file to.
> - s3_path: The path in the S3 bucket where the file will be saved.
> - tablename: The name of the table or identifier for the parquet file.
> - access_key: AWS access key (optional, defaults to class attribute if
not provided).
> - secret_key: AWS secret key (optional, defaults to class attribute if
not provided).
> """
> # access_key = access_key
> # secret_key = secret_key
>
> df = pandas_df
>
> s3_client = boto3.client(
> 's3',
> aws_access_key_id=AWS_ACCESS_KEY_ID,
> aws_secret_access_key=AWS_SECRET_ACCESS_KEY
> )
> out_buffer = BytesIO()
> df.to_parquet(out_buffer, index=False)
> filename = f"{s3_path}/{tablename}.parquet"
> s3_client.put_object(Bucket=s3_bucket, Key=filename,
Body=out_buffer.getvalue())
> # print('pandas dataframe written to s3 as parquet')
>
>
>
> DEFAULT_ARGS = {
> "owner": "tech_analytics",
> "depends_on_past": False,
> "retries": 0,
> "email_on_failure": False,
> "email_on_retry": False,
> "catchup": False,
> "redshift_conn_id": "REDSHIFT_FRANKFURT",
> "postgres_conn_id": "REDSHIFT_FRANKFURT",
> "conn_id": "REDSHIFT_FRANKFURT",
> # "on_failure_callback": task_fail_slack_alert,
> }
>
> with DAG(
> dag_id="rgbit_integrated_kwdb_static",
> description="keyword",
> default_args=DEFAULT_ARGS,
> # dagrun_timeout=timedelta(hours=1),
> start_date=days_ago(1),
> schedule_interval="0 17 * * 7",
> tags=["keyword","static"],
> template_searchpath="/usr/local/airflow/include/redshift_queries",
> catchup=False,
> max_active_tasks=2
> ) as dag:
>
>
>
> country_tasks_3 = []
> for country in country_list_3:
> main_fn = PythonOperator(
> task_id=f'process_{country}_static',
> python_callable=main,
> op_kwargs={'country': country},
> dag=dag,
> retries=2, # Add retries for each task
> retry_delay=timedelta(minutes=5) # Optionally set a different
retry delay
>
> )
> country_tasks_3.append(main_fn)
> # main_fn = PythonOperator(
> # task_id="main_function",
> # python_callable=main,
> # trigger_rule="all_done",
> # )
>
> wait_for_lambda_1 = PythonOperator(
> task_id="wait_for_lambda_1",
> # python_callable=sleeper,
> python_callable=check_queue_status,
> op_kwargs={},
> trigger_rule="all_done",
> dag=dag
> )
>
> dummy_dag_3 =
DummyOperator(task_id='dummy_dag_3',trigger_rule="all_done", dag=dag)
>
> dummy_dag_4 =
DummyOperator(task_id='dummy_dag_4',trigger_rule="all_done", dag=dag)
>
>
> trigger_glue_static = GlueJobOperator(
> task_id="trigger_glue_static",
> job_name="keyword_presence",
> region_name='eu-central-1',
> dag=dag,
> )
>
>
> rename_file_2 = PythonOperator(
> task_id="rename_file_2",
> python_callable=rename_op,
> dag=dag
> )
>
> folder_delete = PythonOperator(
> task_id="folder_delete",
> python_callable=s3_client.delete_folder,
> op_kwargs={'s3_bucket': s3_bucket, 'folder_path':
f"content_workflows/unloads/keywords_presence_keywords/"},
> )
>
> sql_3 = '''
> unload($$
> select distinct asin, country_code , keyword
> FROM temp_rgbit_keyword_static_list_q2
> ;
>
> $$)
> to
's3://razor-prod-raw-datalake/content_workflows/unloads/keywords_presence_keywords/'
> FORMAT PARQUET
> partition by (country_code)
> ALLOWOVERWRITE
> REGION 'eu-central-1'
> MAXFILESIZE AS 50 MB
> PARALLEL ON
> iam_role 'arn:aws:iam::815361800176:role/redshift-unload';
> '''
>
> unload_items_3 = SQLExecuteQueryOperator(
> task_id=f"unload_items_to_s3_3",
> sql=sql_3
> )
>
>
>
> data_push_static='''
> truncate table rgbit_keyword_presence_intermediate_temp_static;
> COPY rgbit_keyword_presence_intermediate_temp_static
> FROM
's3://razor-prod-raw-datalake/keyword_presence/{today}/compiled/compiled_op.parquet'
> iam_role 'arn:aws:iam::815361800176:role/redshift-unload'
> FORMAT AS parquet
> '''.format(
> today = today
> )
>
> data_push_static_1 = '''
> truncate table
public.rgbit_keyword_presence_intermediate_main_static;
> insert into rgbit_keyword_presence_intermediate_main_static select
asin, country_code, keyword, max_column, max_value, duplication_score,
field_score, backend_keywords__presence_sum, bp1__presence_sum,
bp2__presence_sum, bp3__presence_sum, bp4__presence_sum, bp5__presence_sum,
description__presence_sum, title__presence_sum, forbidden_new_keyword,
forbidden_keyword_removed_kw, trademark_new_keyword,
trademark_keyword_removed_kw
> from (select * ,row_number() over(partition by
asin,country_code,keyword order by asin,country_code,keyword) as rn from
rgbit_keyword_presence_intermediate_temp_static)
> where rn = 1
> '''
>
> trunctate_static = SQLExecuteQueryOperator(
> task_id=f"trunctate_static",
> sql=data_push_static
> )
>
> push_data_static = SQLExecuteQueryOperator(
> task_id=f"data_push_static_1",
> sql=data_push_static_1
> )
>
> folder_delete >> unload_items_3 >> dummy_dag_3
>
> for task in country_tasks_3:
> dummy_dag_3 >> task >> wait_for_lambda_1
>
> wait_for_lambda_1 >> trigger_glue_static >> rename_file_2 >>
trunctate_static >> push_data_static
> ```
_Originally posted by @Srichandra-razor in
[#51651](https://github.com/apache/airflow/issues/51651#issuecomment-2975426622)_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]