subbota19 opened a new issue, #46648:
URL: https://github.com/apache/airflow/issues/46648
### Apache Airflow version
Other Airflow 2 version (please specify below)
### If "Other Airflow 2 version" selected, which one?
2.10.3
### What happened?
The main issue occurs when executing single/multiple SQL statements using
**SnowflakeSqlApiOperator** without the deferable flag. After the first poll
check from Snowflake (with **poll_interval** set to 5 seconds by default), the
triggered Snowflake job continues running. However, Airflow marks the task as
successful, even though the actual Snowflake job is still in progress.
### What you think should happen instead?
Instead of marking the task as successful, Airflow task should wait until
the Snowflake job is fully completed before marking the task as successful or
failed accordingly.
### How to reproduce
Use the following Airflow task:
```
test= SnowflakeSqlApiOperator(
task_id="test",
sql="SELECT SYSTEM$WAIT(10);",
warehouse="WAREHOUSE_ID",
database="DATABASE_ID",
)
```
At the end of the query execution, the following log message appears:
[2025-02-11, 14:54:59 UTC] {snowflake_sql_api.py:283} INFO - {'code':
'333334', 'message': 'Asynchronous execution in progress. Use provided query id
to perform query monitoring and management.', 'statementHandle':
'01ba52be-0103-fdba-0000-8c0d3c1e25ca', 'statementStatusUrl':
'/api/v2/statements/01ba52be-0103-fdba-0000-8c0d3c1e25ca'}
[2025-02-11, 14:54:59 UTC] {taskinstance.py:352} INFO - Marking task as
SUCCESS.
However, task should wait for this job to finish before marking the task as
successful or failed.
### Operating System
Linux 5.15.153.1-microsoft-standard-WSL2 x86_64 GNU/Linux
### Versions of Apache Airflow Providers
apache-airflow-providers-snowflake==6.0.0
### Deployment
Astronomer
### Deployment details
Astro Runtime 12.4.0 (Based on Airflow 2.10.3)
### Anything else?
I want to highlight that in deferable mode, Airflow uses triggers to check
the job status, so every time the deferable interval is reached, it checks the
status and yields an event on [this
line](https://github.com/apache/airflow/blob/main/providers/snowflake/src/airflow/providers/snowflake/triggers/snowflake_trigger.py#L89).
However, when not using deferable logic, the execution follows the [else
statement](https://github.com/apache/airflow/blob/main/providers/snowflake/src/airflow/providers/snowflake/operators/snowflake.py#L456),
which does not include a while loop to monitor execution continuously.
Instead, on [this
line](https://github.com/apache/airflow/blob/main/providers/snowflake/src/airflow/providers/snowflake/operators/snowflake.py#L457),
the task iterates through active Snowflake jobs, checks their status and then
sleeps.
This means:
- If a job finishes in less than 5 seconds, the logic works correctly.
- If a job takes longer than 5 seconds, there is no mechanism to keep
monitoring it, and the task will incorrectly finish after the for-loop ends -
even if the Snowflake job is still running.
**Suggested Fix**
I propose adding a running status check inside **poll_on_queries** and
modifying the else statement with a loop to ensure Airflow waits until all
queries finish:
```
else:
while True:
statement_status = self.poll_on_queries()
if statement_status["error"]:
raise AirflowException(statement_status["error"])
if not statement_status["running"]:
break
self._hook.check_query_output(self.query_ids)
```
With this change, Airflow would periodically call **poll_on_queries**() and
continue waiting until all jobs complete.
As part of my testing, I ran the overridden operator with my changes, and
this is the log and expected result at the end:
[2025-02-11, 14:55:45 UTC] {snowflake_sql_api.py:289} INFO - Snowflake SQL
GET statements status API response: {'code': '333334', 'message':
'**Asynchronous execution in progress. Use provided query id to perform query
monitoring and management**.', 'statementHandle':
'01ba52bf-0103-fe23-0000-8c0d3c1dcb3e', 'statementStatusUrl':
'/api/v2/statements/01ba52bf-0103-fe23-0000-8c0d3c1dcb3e'}
[2025-02-11, 14:55:50 UTC] {snowflake.py:80} INFO - checking :
01ba52bf-0103-fe23-0000-8c0d3c1dcb3e
[2025-02-11, 14:55:50 UTC] {snowflake_sql_api.py:315} INFO - Retrieving
status for query id 01ba52bf-0103-fe23-0000-8c0d3c1dcb3e
[2025-02-11, 14:55:50 UTC] {snowflake_sql_api.py:289} INFO - Snowflake SQL
GET statements status API response: {'resultSetMetaData': {'numRows': 1,
'format': 'jsonv2', 'partitionInfo': [{'rowCount': 1, 'uncompressedSize': 57}],
'rowType': [{'name': 'multiple statement execution', 'database': '', 'schema':
'', 'table': '', 'nullable': False, 'length': 16777216, 'type': 'text',
'scale': None, 'precision': None, 'byteLength': 16777216, 'collation': None}]},
'data': [['Multiple statements executed successfully.']], 'code': '090001',
'statementHandles': ['01ba52bf-0103-fe23-0000-8c0d3c1dcb42',
'01ba52bf-0103-fe23-0000-8c0d3c1dcb4a'], 'statementStatusUrl':
'/api/v2/statements/01ba52bf-0103-fe23-0000-8c0d3c1dcb3e?requestId=a85631a6-fa5d-4a0e-b4ca-e9800ac36191',
'requestId': 'a85631a6-fa5d-4a0e-b4ca-e9800ac36191', 'sqlState': '00000',
'statementHandle': '01ba52bf-0103-fe23-0000-8c0d3c1dcb3e', 'message':
'**Statement executed successfully.**', 'createdOn': 1739285703540}
[2025-02-11, 14:55:55 UTC] {snowflake_sql_api.py:283} INFO -
{'resultSetMetaData': {'numRows': 1, 'format': 'jsonv2', 'partitionInfo':
[{'rowCount': 1, 'uncompressedSize': 57}], 'rowType': [{'name': 'multiple
statement execution', 'database': '', 'schema': '', 'table': '', 'nullable':
False, 'length': 16777216, 'type': 'text', 'scale': None, 'precision': None,
'byteLength': 16777216, 'collation': None}]}, 'data': [['**Multiple statements
executed successfully.'**]], 'code': '090001', 'statementHandles':
['01ba52bf-0103-fe23-0000-8c0d3c1dcb42',
'01ba52bf-0103-fe23-0000-8c0d3c1dcb4a'], 'statementStatusUrl':
'/api/v2/statements/01ba52bf-0103-fe23-0000-8c0d3c1dcb3e?requestId=a85631a6-fa5d-4a0e-b4ca-e9800ac36191',
'requestId': 'a85631a6-fa5d-4a0e-b4ca-e9800ac36191', 'sqlState': '00000',
'statementHandle': '01ba52bf-0103-fe23-0000-8c0d3c1dcb3e', 'message':
'Statement executed successfully.', 'createdOn': 1739285703540}
[2025-02-11, 14:55:55 UTC] {taskinstance.py:340} ▼ Post task execution logs
[2025-02-11, 14:55:55 UTC] {taskinstance.py:352} INFO - Marking task as
SUCCESS.
At the end I want to say that of course for long-running jobs, we should use
the deferable flag, but if the non-deferable option remains available, its
logic should still function correctly
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]