nailo2c opened a new pull request, #52926:
URL: https://github.com/apache/airflow/pull/52926
Closes: #35012
# Why
1. Port `ssl_verify_cert` from `pydruid` to `DruidDbApiHook.get_conn`.
2. `get_pandas_df` was implemented in #35494.
# How
```python
druid_broker_conn = connect(
...
ssl_verify_cert=conn.extra_dejson.get("ssl_verify_cert", True),
)
```
# What
## ssl_verify_cert
Create connections with following commands:
```console
airflow connections add 'druid_ssl_true' \
--conn-type 'druid' \
--conn-host 'broker' \
--conn-port '8082' \
--conn-schema 'https' \
--conn-extra '{"endpoint": "/druid/v2/sql", "ssl_verify_cert": true}'
airflow connections add 'druid_ssl_false' \
--conn-type 'druid' \
--conn-host 'broker' \
--conn-port '8082' \
--conn-schema 'https' \
--conn-extra '{"endpoint": "/druid/v2/sql", "ssl_verify_cert": false}'
```
Create a DAG to test `ssl_verify_cert`:
```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.druid.hooks.druid import DruidDbApiHook
from datetime import datetime
from airflow.exceptions import AirflowException
import logging
def test_druid_ssl_verify(conn_id):
hook = DruidDbApiHook(druid_broker_conn_id=conn_id)
try:
conn = hook.get_conn()
logging.info('=== conn.ssl_verify_cert ===')
logging.info(conn.ssl_verify_cert)
cur = conn.cursor()
cur.execute("SELECT 1")
result = cur.fetchall()
logging.info("Druid query result: %s", result)
except Exception as e:
logging.error("Connection failed: %s", str(e))
raise AirflowException(f"SSL test failed for connection {conn_id}:
{str(e)}")
finally:
conn.close()
with DAG(
dag_id="test_druid_ssl_verify",
start_date=datetime(2025, 7, 1),
schedule=None,
catchup=False,
tags=["druid", "ssl-test"],
) as dag:
test_conn_ssl_true = PythonOperator(
task_id="test_ssl_verify_true",
python_callable=test_druid_ssl_verify,
op_args=["druid_ssl_true"],
)
test_conn_ssl_false = PythonOperator(
task_id="test_ssl_verify_false",
python_callable=test_druid_ssl_verify,
op_args=["druid_ssl_false"],
)
test_conn_ssl_true >> test_conn_ssl_false
```
Result - matches expectation:
+ `ssl_verify_cert=true`
<img width="1905" alt="druid_ssl_true"
src="https://github.com/user-attachments/assets/faad3e75-b494-4a98-b6a4-2770108cd27d"
/>
+ `ssl_verify_cert=false`
<img width="1904" alt="druid_ssl_false"
src="https://github.com/user-attachments/assets/0dd1d1e7-71a6-405c-ade0-46136e8ef9fd"
/>
## get_pandas_df
Example DAG:
```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.druid.hooks.druid import DruidDbApiHook
from datetime import datetime
import logging
def test_druid_get_pandas_df(conn_id: str):
hook = DruidDbApiHook(druid_broker_conn_id=conn_id)
try:
df = hook.get_pandas_df("SELECT 1")
logging.info("Druid returned dataframe:\n%s", df)
except Exception as e:
logging.error("Druid get_pandas_df failed: %s", e)
raise
with DAG(
dag_id="test_druid_get_pandas_df",
start_date=datetime(2025, 7, 1),
schedule=None,
catchup=False,
tags=["druid", "ssl", "get_pandas_df"],
) as dag:
run_query = PythonOperator(
task_id="run_druid_query",
python_callable=test_druid_get_pandas_df,
op_args=["druid_ssl_false"],
)
```
Result:
<img width="1909" alt="druid_pd_df"
src="https://github.com/user-attachments/assets/8b96bce7-ad28-4816-a129-1e987f9153a1"
/>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]