fritz-astronomer commented on PR #29599:
URL: https://github.com/apache/airflow/pull/29599#issuecomment-1435100097
Test DAG to fix (see linked issue for Test DAG Demonstrating error):
```
from __future__ import annotations
import os
from datetime import datetime
from typing import Optional, Any, Sequence
from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
os.environ["AIRFLOW_CONN_SNOWFLAKE"] = "snowflake://.............."
class SnowflakePatchOperator(SnowflakeOperator):
def _process_output(
self,
results: Optional[list[Any]],
descriptions: list[Sequence[Sequence] | None]
) -> list[Any]:
# Handle do_xcom_push=False
if results is None or results == [None]:
return [None]
validated_descriptions: list[Sequence[Sequence]] = []
for idx, description in enumerate(descriptions):
if not description:
raise RuntimeError(
f"The query did not return descriptions of the cursor
for query number {idx}. "
"Cannot return values in a form of dictionary for that
query."
)
validated_descriptions.append(description)
returned_results = []
for result_id, result_list in enumerate(results):
current_processed_result = []
for row in result_list:
dict_result: dict[Any, Any] = {}
for idx, description in
enumerate(validated_descriptions[result_id]):
dict_result[description[0]] = row[idx]
current_processed_result.append(dict_result)
returned_results.append(current_processed_result)
return returned_results
with DAG('snowflake_test', schedule=None, start_date=datetime(2023, 1, 1)):
SnowflakePatchOperator(
task_id='snowflake_test',
snowflake_conn_id="snowflake",
sql="select 1;",
do_xcom_push=False
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]