jaetma commented on issue #33688:
URL: https://github.com/apache/airflow/issues/33688#issuecomment-1694910574
Hello @sunank200
This is the DAG, its just a simple query insertion:
```
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import timedelta, datetime
import logging
import os
logger = logging.getLogger("airflow.task")
default_run_days = 1
interval = {
'dev': None,
'qa': None,
'prod': None
}
emails = {
'dev': ['{ censored }'],
'qa': ['{ censored }'],
'prod': ['{ censored }']
}
# default arguments
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': days_ago(0),
'email': emails[os.environ['BBR_ENVIRONMENT']],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=2)
}
def on_failure_callback(context):
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail
import traceback
ti = context['task_instance']
print("FAILURE CALLBACK")
exception = context.get('exception')
formatted_exception = ''.join(
traceback.format_exception(etype=type(exception), value=exception,
tb=exception.__traceback__)
).strip()
message = Mail(
from_email=os.environ.get('SENDGRID_MAIL_FROM'),
to_emails=emails[os.environ['BBR_ENVIRONMENT']],
subject=f'Airflow error: {ti.dag_id}, Env:' +
str(os.environ['BBR_ENVIRONMENT']),
html_content=f"""
<p>ENV: {str(os.environ['BBR_ENVIRONMENT'])}</p>
<p>DAG ID: {ti.dag_id}</p>
<p>TASK ID: {ti.task_id}</p>
<p>DATE: {str(datetime.now())}</p>
<p>EXCEPTION: {formatted_exception}</p>
"""
)
sg =
SendGridAPIClient(open(os.environ.get('SENDGRID_API_KEY')).read().replace('\n',
''))
sg.send(message)
# initializing dag
dag = DAG(
'portalle_sale_transaction',
default_args=default_args,
catchup=False,
schedule_interval=interval[os.environ['BBR_ENVIRONMENT']],
max_active_runs=20,
tags=['portalle'],
)
def fetch_header(ti, **kwargs):
record = kwargs["dag_run"].conf
print(record)
output_json = {
"trx_id": 1,
"trx_tipo_trx": record['acquirer']['transaction_type_code'],
"trx_emisor": record['acquirer']['acquirer_code'],
"trx_comercio": record['merchant']['merchant_code'],
"trx_local": record['merchant']['store_code'],
"trx_pos": record['merchant']['terminal_code'],
"trx_pais": record['merchant']['country'],
"trx_boleta": record['transaction']['document_number'],
"trx_fecha":
datetime.utcfromtimestamp(record['transaction']['requested_at']).strftime('%Y-%m-%d
%H:%M:%S'),
"trx_hora":
datetime.utcfromtimestamp(record['transaction']['requested_at']).strftime('%Y-%m-%d
%H:%M:%S'),
"trx_numero": record['transaction']['transaction_number'],
"trx_estado": record['status']['status_code'],
"trx_cod_rechazo": record['status']['rejected_code'],
"trx_glosa_rech": record['status']['rejected_description'],
"trx_estado_portal": record['status']['portal'],
"trx_version": record['status']['version'],
"trx_obs": record['status']['obs'],
"trx_tarjeta": record['acquirer']['card_type_code'],
"trx_monto": record['payment']['amount'],
"trx_cuotas": record['payment']['installments'],
"trx_ult4_dig": 0, #record['payment']['card_number'][-4:], #GET LAST
4 characters
"trx_ts_req":
(datetime.utcfromtimestamp(record['transaction']['accounted_at']).strftime('%Y-%m-%d
%H:%M:%S')) if record['transaction']['accounted_at'] else None,
"trx_ts_rsp":
(datetime.utcfromtimestamp(record['transaction']['accounted_at']).strftime('%Y-%m-%d
%H:%M:%S')) if record['transaction']['accounted_at'] else None,
"trx_numtarjeta": record['payment']['card_number'],
"trx_codautor": record['payment']['authorization_code'],
"trx_mpago": record['payment']['m'],
"trx_pin": record['payment']['pin'],
"trx_cadena": record['merchant']['channel_code'],
"trx_vend_caj": record['merchant']['operator_code'],
"trx_fechacont":
datetime.utcfromtimestamp(record['transaction']['requested_at']).strftime('%Y-%m-%d
00:00:00'),
"trx_oc_merchant_code":
record['agoraweb']['purchase_order_merchant_code'],
"trx_oc_merchant_description":
record['agoraweb']['purchase_order_merchant_description'],
"trx_oc_order_code": record['agoraweb']['purchase_order_code'],
"trx_oc_order_amount": record['agoraweb']['purchase_order_monto'],
"trx_usuario1": record['metadata']['1'],
"trx_usuario2": record['metadata']['2'],
"trx_usuario3": record['metadata']['3'],
"trx_usuario4": record['metadata']['4'],
"trx_usuario5": record['metadata']['5'],
"trx_usuario6": record['metadata']['6'],
"trx_usuario7": record['metadata']['7'],
"trx_usuario8": record['metadata']['8'],
"trx_llave": record['metadata']['ref_id'],
"trx_id_con": record['metadata']['ref_id_con'],
"trx_journal": record['metadata']['journal'],
}
return output_json
def add_fixed_values_header(ti, **kwargs):
record = ti.xcom_pull(task_ids=['fetch_header'])[0]
record['trx_ip'] = 0
record['trx_vuelto'] = 'NULL'
record['trx_donacion'] = 'NULL'
record['trx_nro_lote'] = 'NULL'
record['trx_lote_abono_com'] = 0
record['trx_lote_abono_ban'] = 0
return record
def schema_validation_header(ti):
from schema import Schema, And, Use
record = ti.xcom_pull(task_ids=['add_fixed_values_header'])[0]
conf_schema = Schema({
"trx_id": And(Use(int)),
"trx_tipo_trx": And(Use(str)),
"trx_emisor": And(Use(str)),
"trx_comercio": And(Use(str)),
"trx_local": And(Use(int)),
"trx_pos": And(Use(int)),
"trx_boleta": And(Use(str)),
"trx_fecha": And(Use(str)),
"trx_hora": And(Use(str)),
"trx_numero": And(Use(str)),
"trx_estado": And(Use(int)),
"trx_cod_rechazo": And(Use(str)),
"trx_glosa_rech": And(Use(str)),
"trx_tarjeta": And(Use(str)),
"trx_monto": And(Use(int)),
"trx_cuotas": And(Use(int)),
"trx_ult4_dig": And(Use(str)),
"trx_ts_req": And(Use(str)),
"trx_ts_rsp": And(Use(str)),
"trx_numtarjeta": And(Use(str)),
"trx_codautor": And(Use(str)),
"trx_cadena": And(Use(int)),
"trx_vend_caj": And(Use(str)),
"trx_fechacont": And(Use(str)),
"trx_oc_merchant_code": And(Use(str)),
"trx_oc_merchant_description": And(Use(str)),
"trx_oc_order_code": And(Use(str)),
"trx_oc_order_amount": And(Use(str)),
#FIXED
"trx_version": And(Use(str)),
"trx_journal": And(Use(str)),
"trx_id_con": And(Use(str)),
"trx_pais": And(Use(str)),
"trx_estado_portal": And(Use(int)),
"trx_llave": And(Use(str)),
"trx_mpago": And(Use(str)),
"trx_pin": And(Use(str)),
"trx_ip": And(Use(str)),
"trx_vuelto": And(Use(str)),
"trx_donacion": And(Use(str)),
"trx_nro_lote": And(Use(str)),
"trx_usuario1": And(Use(str)),
"trx_usuario2": And(Use(str)),
"trx_usuario3": And(Use(str)),
"trx_usuario4": And(Use(str)),
"trx_usuario5": And(Use(str)),
"trx_usuario6": And(Use(str)),
"trx_usuario7": And(Use(str)),
"trx_usuario8": And(Use(str)),
"trx_obs": And(Use(str)),
"trx_lote_abono_com": And(Use(int)),
"trx_lote_abono_ban": And(Use(int)),
})
conf_schema.validate(record)
return record
def generate_sql_header(ti):
record = ti.xcom_pull(task_ids=['schema_validation_header'])[0]
SQL = ""
table = "{ censored }"
SQL+='{ censored }'
SQL+="""
INSERT INTO {table} (
trx_id,
trx_tipo_trx, trx_emisor, trx_comercio, trx_local,
trx_pos, trx_boleta, trx_fecha, trx_hora,
trx_numero, trx_estado, trx_cod_rechazo, trx_glosa_rech,
trx_tarjeta, trx_monto, trx_cuotas, trx_ult4_dig,
trx_ts_req, trx_ts_rsp, trx_numtarjeta, trx_codautor,
trx_cadena, trx_vend_caj, trx_fechacont, trx_oc_merchant_code,
trx_oc_merchant_description, trx_oc_order_code,
trx_oc_order_amount, trx_version,
trx_journal, trx_id_con, trx_pais, trx_estado_portal,
trx_llave, trx_mpago, trx_pin, trx_ip,
trx_vuelto, trx_donacion, trx_nro_lote, trx_usuario1,
trx_usuario2, trx_usuario3, trx_usuario4, trx_usuario5,
trx_usuario6, trx_usuario7, trx_usuario8, trx_obs,
trx_lote_abono_com, trx_lote_abono_ban
) VALUES (
{ censored },
'{trx_tipo_trx}', '{trx_emisor}', {trx_comercio}, {trx_local},
{trx_pos}, {trx_boleta}, '{trx_fecha}', '{trx_hora}',
{trx_numero}, {trx_estado}, {trx_cod_rechazo},
'{trx_glosa_rech}',
'{trx_tarjeta}', {trx_monto}, {trx_cuotas}, '{trx_ult4_dig}',
{trx_ts_req}, {trx_ts_rsp}, '{trx_numtarjeta}', {trx_codautor},
{trx_cadena}, {trx_vend_caj}, '{trx_fechacont}',
{trx_oc_merchant_code},
{trx_oc_merchant_description}, {trx_oc_order_code},
{trx_oc_order_amount}, {trx_version},
{trx_journal}, '{trx_id_con}', {trx_pais}, {trx_estado_portal},
'{trx_llave}', {trx_mpago}, {trx_pin}, {trx_ip},
{trx_vuelto}, {trx_donacion}, {trx_nro_lote}, {trx_usuario1},
{trx_usuario2}, {trx_usuario3}, {trx_usuario4}, {trx_usuario5},
{trx_usuario6}, {trx_usuario7}, {trx_usuario8}, {trx_obs},
{trx_lote_abono_com}, {trx_lote_abono_ban}
);
""".format(
table=table,
trx_tipo_trx=record["trx_tipo_trx"],
trx_emisor=record["trx_emisor"],
trx_comercio=record["trx_comercio"],
trx_local=record["trx_local"],
trx_pos=record["trx_pos"],
trx_boleta=record["trx_boleta"],
trx_fecha=record['trx_fecha'],
trx_hora=record['trx_hora'],
trx_numero=record['trx_numero'],
trx_estado=record["trx_estado"],
trx_cod_rechazo=record["trx_cod_rechazo"] if
record["trx_cod_rechazo"] else '0',
trx_glosa_rech=record["trx_glosa_rech"],
trx_tarjeta=record["trx_tarjeta"],
trx_monto=record["trx_monto"],
trx_cuotas=record["trx_cuotas"],
trx_ult4_dig=record["trx_ult4_dig"],
trx_ts_req=("'" + record["trx_ts_req"] + "'") if
record["trx_ts_req"] else 'NULL',
trx_ts_rsp=("'" + record["trx_ts_rsp"] + "'") if
record["trx_ts_rsp"] else 'NULL',
trx_numtarjeta=record["trx_numtarjeta"],
trx_codautor=("'" + record["trx_codautor"] + "'") if
record["trx_codautor"] else 'NULL',
trx_cadena=record["trx_cadena"],
trx_vend_caj=record["trx_vend_caj"],
trx_fechacont=record["trx_fechacont"],
trx_oc_merchant_code=("'" + record["trx_oc_merchant_code"] + "'") if
record["trx_oc_merchant_code"] else 'NULL',
trx_oc_merchant_description=("'" +
record["trx_oc_merchant_description"] + "'") if
record["trx_oc_merchant_description"] else 'NULL',
trx_oc_order_code=("'" + record["trx_oc_order_code"] + "'") if
record["trx_oc_order_code"] else 'NULL',
trx_oc_order_amount=record["trx_oc_order_amount"] if
record["trx_oc_order_amount"] else 'NULL',
trx_version=record["trx_version"] if record["trx_version"] else
'NULL',
trx_journal=("'" + record["trx_journal"] + "'") if
record["trx_journal"] else 'NULL',
trx_id_con=record["trx_id_con"],
trx_pais=record["trx_pais"],
trx_estado_portal=record["trx_estado_portal"],
trx_llave=record["trx_llave"],
trx_mpago=("'" + record["trx_mpago"] + "'") if record["trx_mpago"]
else 'NULL',
trx_pin=record["trx_pin"],
trx_ip=record["trx_ip"],
trx_vuelto=record["trx_vuelto"],
trx_donacion=record["trx_donacion"],
trx_nro_lote=record["trx_nro_lote"],
trx_usuario1=record["trx_usuario1"],
trx_usuario2=record["trx_usuario2"],
trx_usuario3=("'" + str(record["trx_usuario3"]) + "'") if
record["trx_usuario3"] else 'NULL',
trx_usuario4=("'" + str(record["trx_usuario4"]).replace("'", '') +
"'") if record["trx_usuario4"] else 'NULL',
trx_usuario5=("'" + record["trx_usuario5"] + "'") if
record["trx_usuario5"] else 'NULL',
trx_usuario6=("'" + str(record["trx_usuario6"]) + "'") if
record["trx_usuario6"] else 'NULL',
trx_usuario7=("'" + str(record["trx_usuario7"]).replace("'", '') +
"'") if record["trx_oc_merchant_description"] else 'NULL',
trx_usuario8=("'" + str(record["trx_usuario8"]) + "'") if
record["trx_usuario8"] else 'NULL',
trx_obs=("'" + str(record["trx_obs"]) + "'") if record["trx_obs"]
else 'NULL',
trx_lote_abono_com=record["trx_lote_abono_com"],
trx_lote_abono_ban=record["trx_lote_abono_ban"],
)
return SQL
fetch_header = PythonOperator(
task_id='fetch_header',
provide_context=True,
python_callable=fetch_header,
on_failure_callback=on_failure_callback,
dag=dag
)
add_fixed_values_header = PythonOperator(
task_id='add_fixed_values_header',
provide_context=True,
python_callable=add_fixed_values_header,
on_failure_callback=on_failure_callback,
dag=dag
)
schema_validation_header = PythonOperator(
task_id='schema_validation_header',
provide_context=True,
python_callable=schema_validation_header,
on_failure_callback=on_failure_callback,
dag=dag
)
generate_sql_header = PythonOperator(
task_id='generate_sql_header',
provide_context=True,
python_callable=generate_sql_header,
on_failure_callback=on_failure_callback,
dag=dag
)
execute_sql = PostgresOperator(
task_id="execute_sql",
postgres_conn_id='ple_postgres_' + os.environ['BBR_ENVIRONMENT'],
sql="{{ ti.xcom_pull(task_ids=['generate_sql_header'])[0] }}",
dag=dag
)
fetch_header >> add_fixed_values_header >> schema_validation_header >>
generate_sql_header >> execute_sql
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]