rustikk opened a new issue #21112:
URL: https://github.com/apache/airflow/issues/21112
### Apache Airflow version
2.2.3 (latest released)
### What happened
When trying to run the HttpOperator it goes straight to up for retry instead
of running and never fails with a helpful error message it just stays stuck in
'up_for_retry'.
### What you expected to happen
I expected the HttpOperator to fail helpfully.
### How to reproduce
```import shutil
from contextlib import closing
from datetime import timedelta
import pandas as pd
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.dates import days_ago
email = Variable.get("email_list")
default_args = {
"owner": "Airflow",
"depends_on_past": False,
"start_date": days_ago(1),
"email": email,
"email_on_failure": False,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=3),
}
def select_columns():
df = pd.read_csv("/tmp/output.csv", encoding="unicode_escape")
filtered_df = df[df["Journal title"].notnull()]
filtered_df["updated_at"] = "now()"
filtered_df[["Journal title", "Publisher", "updated_at"]].to_csv(
"/tmp/final.csv", index=False, header=False
)
def load_to_db():
psql = PostgresHook("local_postgres")
with open("/tmp/final.csv", "r+") as f:
with closing(psql.get_conn()) as conn:
with closing(conn.cursor()) as cur:
sql = "COPY qa_data_csv FROM STDIN DELIMITER ',' CSV HEADER"
cur.copy_expert(sql, f)
f.truncate(f.tell())
conn.commit()
def check(response):
with open("output.csv", "w") as output_f:
output_f.write(str(response))
shutil.copyfile("output.csv", "/tmp/output.csv")
if response:
print("Returning True")
return True
else:
print("Returning False")
return False
dag = DAG(
"dag_load_db_httpoptr_csv",
start_date=days_ago(1),
schedule_interval=None,
default_args=default_args,
tags=["core", "http"],
)
t1 = SimpleHttpOperator(
task_id="get_csv_data",
start_date=days_ago(1),
method="GET",
http_conn_id="http_default",
endpoint="/system/files/Journals.csv",
response_check=lambda response: True if check(response.text) is True
else False,
headers={
"Content-Type": "application/json",
"user-key": "02e6b9f4f59ed2504ba5ee836ce393d5",
},
do_xcom_push=True,
dag=dag,
)
table = "qa_data_csv"
t2 = PythonOperator(task_id="select_columns", python_callable=select_columns)
psql1 = PostgresOperator(
task_id="drop_db_table",
postgres_conn_id="local_postgres",
sql=f"DROP TABLE IF EXISTS {table}",
dag=dag,
)
psql2 = PostgresOperator(
task_id="create_db_table",
postgres_conn_id="local_postgres",
sql=f"CREATE TABLE IF NOT EXISTS {table} (title VARCHAR(500), Publisher
VARCHAR(500), updated_at timestamp)",
dag=dag,
)
psql3 = PythonOperator(task_id="load_to_db", python_callable=load_to_db,
dag=dag)
t1 >> t2 >> psql1 >> psql2 >> psql3
```
### Operating System
Docker (debian:buster)
### Versions of Apache Airflow Providers
Postgres 12
### Deployment
Astronomer
### Deployment details
Astro CLI with image:
quay.io/astronomer/ap-airflow-dev:2.2.3-2
### Anything else
Happens everytime.
The connection isn't set in this dag but it should still fail helpfully if
there's not a connection set yet.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]