jaetma commented on issue #33688:
URL: https://github.com/apache/airflow/issues/33688#issuecomment-1694910574

   Hello @sunank200 
   
   This is the DAG, its just a simple query insertion:
   
   ```
   from airflow import DAG
   from airflow.utils.dates import days_ago
   from airflow.operators.python_operator import PythonOperator
   from airflow.providers.postgres.operators.postgres import PostgresOperator
   from datetime import timedelta, datetime
   import logging
   import os
   
   logger = logging.getLogger("airflow.task")
   
   default_run_days = 1
   
   interval = {
       'dev': None,
       'qa': None,
       'prod': None
   }
   
   emails = {
       'dev': ['{ censored }'],
       'qa': ['{ censored }'],
       'prod': ['{ censored }']
   }
   
   # default arguments
   default_args = {
       'owner': 'admin',
       'depends_on_past': False,
       'start_date': days_ago(0),
       'email': emails[os.environ['BBR_ENVIRONMENT']],
       'email_on_failure': False,
       'email_on_retry': False,
       'retries': 0,
       'retry_delay': timedelta(minutes=2)
   }
   
   def on_failure_callback(context):
       from sendgrid import SendGridAPIClient
       from sendgrid.helpers.mail import Mail
       import traceback
   
       ti = context['task_instance']
       print("FAILURE CALLBACK")
   
       exception = context.get('exception')
   
       formatted_exception = ''.join(
           traceback.format_exception(etype=type(exception), value=exception, 
tb=exception.__traceback__)
       ).strip()
       
       message = Mail(
           from_email=os.environ.get('SENDGRID_MAIL_FROM'),
           to_emails=emails[os.environ['BBR_ENVIRONMENT']],
           subject=f'Airflow error: {ti.dag_id}, Env:' + 
str(os.environ['BBR_ENVIRONMENT']),
           html_content=f"""
           <p>ENV: {str(os.environ['BBR_ENVIRONMENT'])}</p>
           <p>DAG ID: {ti.dag_id}</p>
           <p>TASK ID: {ti.task_id}</p>
           <p>DATE: {str(datetime.now())}</p>
           <p>EXCEPTION: {formatted_exception}</p>
           """
       )
       sg = 
SendGridAPIClient(open(os.environ.get('SENDGRID_API_KEY')).read().replace('\n', 
''))
       sg.send(message)
   
   # initializing dag
   dag = DAG(
       'portalle_sale_transaction',
       default_args=default_args,
       catchup=False,
       schedule_interval=interval[os.environ['BBR_ENVIRONMENT']],
       max_active_runs=20,
       tags=['portalle'],
   )
   
   def fetch_header(ti, **kwargs):
       record = kwargs["dag_run"].conf
       print(record)
       output_json = { 
           "trx_id": 1, 
           "trx_tipo_trx": record['acquirer']['transaction_type_code'],
           "trx_emisor": record['acquirer']['acquirer_code'],
           "trx_comercio": record['merchant']['merchant_code'], 
           "trx_local": record['merchant']['store_code'], 
           "trx_pos": record['merchant']['terminal_code'], 
           "trx_pais": record['merchant']['country'],
           "trx_boleta": record['transaction']['document_number'], 
           "trx_fecha": 
datetime.utcfromtimestamp(record['transaction']['requested_at']).strftime('%Y-%m-%d
 %H:%M:%S'), 
           "trx_hora": 
datetime.utcfromtimestamp(record['transaction']['requested_at']).strftime('%Y-%m-%d
 %H:%M:%S'), 
           "trx_numero": record['transaction']['transaction_number'], 
           "trx_estado": record['status']['status_code'], 
           "trx_cod_rechazo": record['status']['rejected_code'], 
           "trx_glosa_rech": record['status']['rejected_description'], 
           "trx_estado_portal": record['status']['portal'], 
           "trx_version": record['status']['version'], 
           "trx_obs": record['status']['obs'],
           "trx_tarjeta": record['acquirer']['card_type_code'],
           "trx_monto": record['payment']['amount'],
           "trx_cuotas": record['payment']['installments'],
           "trx_ult4_dig": 0, #record['payment']['card_number'][-4:], #GET LAST 
4 characters
           "trx_ts_req": 
(datetime.utcfromtimestamp(record['transaction']['accounted_at']).strftime('%Y-%m-%d
 %H:%M:%S')) if record['transaction']['accounted_at'] else None, 
           "trx_ts_rsp": 
(datetime.utcfromtimestamp(record['transaction']['accounted_at']).strftime('%Y-%m-%d
 %H:%M:%S')) if record['transaction']['accounted_at'] else None, 
           "trx_numtarjeta": record['payment']['card_number'],
           "trx_codautor": record['payment']['authorization_code'],
           "trx_mpago": record['payment']['m'],
           "trx_pin": record['payment']['pin'],
           "trx_cadena": record['merchant']['channel_code'], 
           "trx_vend_caj": record['merchant']['operator_code'], 
           "trx_fechacont": 
datetime.utcfromtimestamp(record['transaction']['requested_at']).strftime('%Y-%m-%d
 00:00:00'), 
           "trx_oc_merchant_code": 
record['agoraweb']['purchase_order_merchant_code'], 
           "trx_oc_merchant_description": 
record['agoraweb']['purchase_order_merchant_description'], 
           "trx_oc_order_code": record['agoraweb']['purchase_order_code'], 
           "trx_oc_order_amount": record['agoraweb']['purchase_order_monto'], 
           "trx_usuario1": record['metadata']['1'],
           "trx_usuario2": record['metadata']['2'],
           "trx_usuario3": record['metadata']['3'],
           "trx_usuario4": record['metadata']['4'],
           "trx_usuario5": record['metadata']['5'],
           "trx_usuario6": record['metadata']['6'],
           "trx_usuario7": record['metadata']['7'],
           "trx_usuario8": record['metadata']['8'],
           "trx_llave": record['metadata']['ref_id'],
           "trx_id_con": record['metadata']['ref_id_con'],
           "trx_journal": record['metadata']['journal'],
       }
       return output_json
   
   
   def add_fixed_values_header(ti, **kwargs):
       record = ti.xcom_pull(task_ids=['fetch_header'])[0]
       record['trx_ip'] = 0
       record['trx_vuelto'] = 'NULL'
       record['trx_donacion'] = 'NULL'
       record['trx_nro_lote'] = 'NULL'
       record['trx_lote_abono_com'] = 0
       record['trx_lote_abono_ban'] = 0
       return record
   
   
   def schema_validation_header(ti):
       from schema import Schema, And, Use
       record = ti.xcom_pull(task_ids=['add_fixed_values_header'])[0]
       conf_schema = Schema({
           "trx_id": And(Use(int)),
           "trx_tipo_trx": And(Use(str)),
           "trx_emisor": And(Use(str)),
           "trx_comercio": And(Use(str)),
           "trx_local": And(Use(int)),
           "trx_pos": And(Use(int)),
           "trx_boleta": And(Use(str)),
           "trx_fecha": And(Use(str)),
           "trx_hora": And(Use(str)),
           "trx_numero": And(Use(str)),
           "trx_estado": And(Use(int)),
           "trx_cod_rechazo": And(Use(str)),
           "trx_glosa_rech": And(Use(str)),
           "trx_tarjeta": And(Use(str)),
           "trx_monto": And(Use(int)),
           "trx_cuotas": And(Use(int)),
           "trx_ult4_dig": And(Use(str)),
           "trx_ts_req": And(Use(str)),
           "trx_ts_rsp": And(Use(str)),
           "trx_numtarjeta": And(Use(str)),
           "trx_codautor": And(Use(str)),
           "trx_cadena": And(Use(int)),
           "trx_vend_caj": And(Use(str)),
           "trx_fechacont": And(Use(str)),
           "trx_oc_merchant_code": And(Use(str)),
           "trx_oc_merchant_description": And(Use(str)),
           "trx_oc_order_code": And(Use(str)),
           "trx_oc_order_amount": And(Use(str)),
           #FIXED
           "trx_version": And(Use(str)),
           "trx_journal": And(Use(str)),
           "trx_id_con": And(Use(str)),
           "trx_pais": And(Use(str)),
           "trx_estado_portal": And(Use(int)),
           "trx_llave": And(Use(str)),
           "trx_mpago": And(Use(str)),
           "trx_pin": And(Use(str)),
           "trx_ip": And(Use(str)),
           "trx_vuelto": And(Use(str)),
           "trx_donacion": And(Use(str)),
           "trx_nro_lote": And(Use(str)),
           "trx_usuario1": And(Use(str)),
           "trx_usuario2": And(Use(str)),
           "trx_usuario3": And(Use(str)),
           "trx_usuario4": And(Use(str)),
           "trx_usuario5": And(Use(str)),
           "trx_usuario6": And(Use(str)),
           "trx_usuario7": And(Use(str)),
           "trx_usuario8": And(Use(str)),
           "trx_obs": And(Use(str)),
           "trx_lote_abono_com": And(Use(int)),
           "trx_lote_abono_ban": And(Use(int)),
       })
       conf_schema.validate(record)
       return record
   
   
   def generate_sql_header(ti):
       record = ti.xcom_pull(task_ids=['schema_validation_header'])[0]
       SQL = ""
       table = "{ censored }"
       SQL+='{ censored }'
       SQL+="""
           INSERT INTO {table} (
               trx_id,
               trx_tipo_trx, trx_emisor, trx_comercio, trx_local, 
               trx_pos, trx_boleta, trx_fecha, trx_hora,
               trx_numero, trx_estado, trx_cod_rechazo, trx_glosa_rech,
               trx_tarjeta, trx_monto, trx_cuotas, trx_ult4_dig,
               trx_ts_req, trx_ts_rsp, trx_numtarjeta, trx_codautor,
               trx_cadena, trx_vend_caj, trx_fechacont, trx_oc_merchant_code,
               trx_oc_merchant_description, trx_oc_order_code, 
trx_oc_order_amount, trx_version,
               trx_journal, trx_id_con, trx_pais, trx_estado_portal, 
               trx_llave, trx_mpago, trx_pin, trx_ip, 
               trx_vuelto, trx_donacion, trx_nro_lote, trx_usuario1,
               trx_usuario2, trx_usuario3, trx_usuario4, trx_usuario5,
               trx_usuario6, trx_usuario7, trx_usuario8, trx_obs,
               trx_lote_abono_com, trx_lote_abono_ban
           ) VALUES (
               { censored },
               '{trx_tipo_trx}', '{trx_emisor}', {trx_comercio}, {trx_local}, 
               {trx_pos}, {trx_boleta}, '{trx_fecha}', '{trx_hora}',
               {trx_numero}, {trx_estado}, {trx_cod_rechazo}, 
'{trx_glosa_rech}',
               '{trx_tarjeta}', {trx_monto}, {trx_cuotas}, '{trx_ult4_dig}',
               {trx_ts_req}, {trx_ts_rsp}, '{trx_numtarjeta}', {trx_codautor},
               {trx_cadena}, {trx_vend_caj}, '{trx_fechacont}', 
{trx_oc_merchant_code},
               {trx_oc_merchant_description}, {trx_oc_order_code}, 
{trx_oc_order_amount}, {trx_version},
               {trx_journal}, '{trx_id_con}', {trx_pais}, {trx_estado_portal}, 
               '{trx_llave}', {trx_mpago}, {trx_pin}, {trx_ip}, 
               {trx_vuelto}, {trx_donacion}, {trx_nro_lote}, {trx_usuario1},
               {trx_usuario2}, {trx_usuario3}, {trx_usuario4}, {trx_usuario5},
               {trx_usuario6}, {trx_usuario7}, {trx_usuario8}, {trx_obs},
               {trx_lote_abono_com}, {trx_lote_abono_ban}
           );
       """.format(
           table=table, 
           trx_tipo_trx=record["trx_tipo_trx"],
           trx_emisor=record["trx_emisor"],
           trx_comercio=record["trx_comercio"],
           trx_local=record["trx_local"],
           trx_pos=record["trx_pos"],
           trx_boleta=record["trx_boleta"],
           trx_fecha=record['trx_fecha'],
           trx_hora=record['trx_hora'],
           trx_numero=record['trx_numero'],
           trx_estado=record["trx_estado"],
           trx_cod_rechazo=record["trx_cod_rechazo"] if 
record["trx_cod_rechazo"] else '0',
           trx_glosa_rech=record["trx_glosa_rech"],
           trx_tarjeta=record["trx_tarjeta"],
           trx_monto=record["trx_monto"],
           trx_cuotas=record["trx_cuotas"],
           trx_ult4_dig=record["trx_ult4_dig"],
           trx_ts_req=("'" + record["trx_ts_req"] + "'") if 
record["trx_ts_req"] else 'NULL',
           trx_ts_rsp=("'" + record["trx_ts_rsp"] + "'") if 
record["trx_ts_rsp"] else 'NULL',
           trx_numtarjeta=record["trx_numtarjeta"],
           trx_codautor=("'" + record["trx_codautor"] + "'") if 
record["trx_codautor"] else 'NULL',
           trx_cadena=record["trx_cadena"],
           trx_vend_caj=record["trx_vend_caj"],
           trx_fechacont=record["trx_fechacont"],
           trx_oc_merchant_code=("'" + record["trx_oc_merchant_code"] + "'") if 
record["trx_oc_merchant_code"] else 'NULL',
           trx_oc_merchant_description=("'" + 
record["trx_oc_merchant_description"] + "'") if 
record["trx_oc_merchant_description"] else 'NULL',
           trx_oc_order_code=("'" + record["trx_oc_order_code"] + "'") if 
record["trx_oc_order_code"] else 'NULL',
           trx_oc_order_amount=record["trx_oc_order_amount"] if 
record["trx_oc_order_amount"] else 'NULL',
           trx_version=record["trx_version"] if record["trx_version"] else 
'NULL',
           trx_journal=("'" + record["trx_journal"] + "'") if 
record["trx_journal"] else 'NULL',
           trx_id_con=record["trx_id_con"],
           trx_pais=record["trx_pais"],
           trx_estado_portal=record["trx_estado_portal"],
           trx_llave=record["trx_llave"],
           trx_mpago=("'" + record["trx_mpago"] + "'") if record["trx_mpago"] 
else 'NULL',
           trx_pin=record["trx_pin"],
           trx_ip=record["trx_ip"],
           trx_vuelto=record["trx_vuelto"],
           trx_donacion=record["trx_donacion"],
           trx_nro_lote=record["trx_nro_lote"],
           trx_usuario1=record["trx_usuario1"],
           trx_usuario2=record["trx_usuario2"],
           trx_usuario3=("'" + str(record["trx_usuario3"]) + "'") if 
record["trx_usuario3"] else 'NULL',
           trx_usuario4=("'" + str(record["trx_usuario4"]).replace("'", '') + 
"'") if record["trx_usuario4"] else 'NULL',
           trx_usuario5=("'" + record["trx_usuario5"] + "'") if 
record["trx_usuario5"] else 'NULL',
           trx_usuario6=("'" + str(record["trx_usuario6"]) + "'") if 
record["trx_usuario6"] else 'NULL',
           trx_usuario7=("'" + str(record["trx_usuario7"]).replace("'", '') + 
"'") if record["trx_oc_merchant_description"] else 'NULL',
           trx_usuario8=("'" + str(record["trx_usuario8"]) + "'") if 
record["trx_usuario8"] else 'NULL',
           trx_obs=("'" + str(record["trx_obs"]) + "'") if record["trx_obs"] 
else 'NULL',
           trx_lote_abono_com=record["trx_lote_abono_com"],
           trx_lote_abono_ban=record["trx_lote_abono_ban"],
       )
       return SQL
   
   
   fetch_header = PythonOperator(
       task_id='fetch_header',
       provide_context=True,
       python_callable=fetch_header,
       on_failure_callback=on_failure_callback,
       dag=dag
   )
   
   add_fixed_values_header = PythonOperator(
       task_id='add_fixed_values_header',
       provide_context=True,
       python_callable=add_fixed_values_header,
       on_failure_callback=on_failure_callback,
       dag=dag
   )
   
   schema_validation_header = PythonOperator(
       task_id='schema_validation_header',
       provide_context=True,
       python_callable=schema_validation_header,
       on_failure_callback=on_failure_callback,
       dag=dag
   )
   
   generate_sql_header = PythonOperator(
       task_id='generate_sql_header',
       provide_context=True,
       python_callable=generate_sql_header,
       on_failure_callback=on_failure_callback,
       dag=dag
   )
   
   execute_sql = PostgresOperator(
       task_id="execute_sql",
       postgres_conn_id='ple_postgres_' + os.environ['BBR_ENVIRONMENT'],
       sql="{{ ti.xcom_pull(task_ids=['generate_sql_header'])[0] }}",
       dag=dag
   )
   
   fetch_header >> add_fixed_values_header >> schema_validation_header >> 
generate_sql_header >> execute_sql
   ``` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to