subbota19 opened a new issue, #46648:
URL: https://github.com/apache/airflow/issues/46648

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.10.3
   
   ### What happened?
   
   The main issue occurs when executing single/multiple SQL statements using 
**SnowflakeSqlApiOperator** without the deferable flag. After the first poll 
check from Snowflake (with **poll_interval** set to 5 seconds by default), the 
triggered Snowflake job continues running. However, Airflow marks the task as 
successful, even though the actual Snowflake job is still in progress.
   
   ### What you think should happen instead?
   
   Instead of marking the task as successful, Airflow task should wait until 
the Snowflake job is fully completed before marking the task as successful or 
failed accordingly.
   
   ### How to reproduce
   
   Use the following Airflow task:
   
   ```
       test= SnowflakeSqlApiOperator(
           task_id="test",
           sql="SELECT SYSTEM$WAIT(10);",
           warehouse="WAREHOUSE_ID",
           database="DATABASE_ID",
       )
   
   ```
   At the end of the query execution, the following log message appears:
   
   [2025-02-11, 14:54:59 UTC] {snowflake_sql_api.py:283} INFO - {'code': 
'333334', 'message': 'Asynchronous execution in progress. Use provided query id 
to perform query monitoring and management.', 'statementHandle': 
'01ba52be-0103-fdba-0000-8c0d3c1e25ca', 'statementStatusUrl': 
'/api/v2/statements/01ba52be-0103-fdba-0000-8c0d3c1e25ca'}
   [2025-02-11, 14:54:59 UTC] {taskinstance.py:352} INFO - Marking task as 
SUCCESS. 
   
   However, task should wait for this job to finish before marking the task as 
successful or failed.
   
   ### Operating System
   
   Linux 5.15.153.1-microsoft-standard-WSL2 x86_64 GNU/Linux
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-snowflake==6.0.0
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   Astro Runtime 12.4.0 (Based on Airflow 2.10.3)
   
   
   ### Anything else?
   
   I want to highlight that in deferable mode, Airflow uses triggers to check 
the job status, so every time the deferable interval is reached, it checks the 
status and yields an event on [this 
line](https://github.com/apache/airflow/blob/main/providers/snowflake/src/airflow/providers/snowflake/triggers/snowflake_trigger.py#L89).
   
   However, when not using deferable logic, the execution follows the [else 
statement](https://github.com/apache/airflow/blob/main/providers/snowflake/src/airflow/providers/snowflake/operators/snowflake.py#L456),
 which does not include a while loop to monitor execution continuously. 
Instead, on [this 
line](https://github.com/apache/airflow/blob/main/providers/snowflake/src/airflow/providers/snowflake/operators/snowflake.py#L457),
 the task iterates through active Snowflake jobs, checks their status and then 
sleeps.
   
   This means:
   
   - If a job finishes in less than 5 seconds, the logic works correctly.
   
   - If a job takes longer than 5 seconds, there is no mechanism to keep 
monitoring it, and the task will incorrectly finish after the for-loop ends - 
even if the Snowflake job is still running.
   
   **Suggested Fix**
   I propose adding a running status check inside **poll_on_queries** and 
modifying the else statement with a loop to ensure Airflow waits until all 
queries finish:
   
   ```
           else:
               while True:
                   statement_status = self.poll_on_queries()
                   if statement_status["error"]:
                       raise AirflowException(statement_status["error"])
                   if not statement_status["running"]:
                       break
               self._hook.check_query_output(self.query_ids)
   ```
   With this change, Airflow would periodically call **poll_on_queries**() and 
continue waiting until all jobs complete.
   
   As part of my testing, I ran the overridden operator with my changes, and 
this is the log and expected result at the end:
   
   [2025-02-11, 14:55:45 UTC] {snowflake_sql_api.py:289} INFO - Snowflake SQL 
GET statements status API response: {'code': '333334', 'message': 
'**Asynchronous execution in progress. Use provided query id to perform query 
monitoring and management**.', 'statementHandle': 
'01ba52bf-0103-fe23-0000-8c0d3c1dcb3e', 'statementStatusUrl': 
'/api/v2/statements/01ba52bf-0103-fe23-0000-8c0d3c1dcb3e'}
   [2025-02-11, 14:55:50 UTC] {snowflake.py:80} INFO - checking : 
01ba52bf-0103-fe23-0000-8c0d3c1dcb3e
   [2025-02-11, 14:55:50 UTC] {snowflake_sql_api.py:315} INFO - Retrieving 
status for query id 01ba52bf-0103-fe23-0000-8c0d3c1dcb3e
   [2025-02-11, 14:55:50 UTC] {snowflake_sql_api.py:289} INFO - Snowflake SQL 
GET statements status API response: {'resultSetMetaData': {'numRows': 1, 
'format': 'jsonv2', 'partitionInfo': [{'rowCount': 1, 'uncompressedSize': 57}], 
'rowType': [{'name': 'multiple statement execution', 'database': '', 'schema': 
'', 'table': '', 'nullable': False, 'length': 16777216, 'type': 'text', 
'scale': None, 'precision': None, 'byteLength': 16777216, 'collation': None}]}, 
'data': [['Multiple statements executed successfully.']], 'code': '090001', 
'statementHandles': ['01ba52bf-0103-fe23-0000-8c0d3c1dcb42', 
'01ba52bf-0103-fe23-0000-8c0d3c1dcb4a'], 'statementStatusUrl': 
'/api/v2/statements/01ba52bf-0103-fe23-0000-8c0d3c1dcb3e?requestId=a85631a6-fa5d-4a0e-b4ca-e9800ac36191',
 'requestId': 'a85631a6-fa5d-4a0e-b4ca-e9800ac36191', 'sqlState': '00000', 
'statementHandle': '01ba52bf-0103-fe23-0000-8c0d3c1dcb3e', 'message': 
'**Statement executed successfully.**', 'createdOn': 1739285703540}
   [2025-02-11, 14:55:55 UTC] {snowflake_sql_api.py:283} INFO - 
{'resultSetMetaData': {'numRows': 1, 'format': 'jsonv2', 'partitionInfo': 
[{'rowCount': 1, 'uncompressedSize': 57}], 'rowType': [{'name': 'multiple 
statement execution', 'database': '', 'schema': '', 'table': '', 'nullable': 
False, 'length': 16777216, 'type': 'text', 'scale': None, 'precision': None, 
'byteLength': 16777216, 'collation': None}]}, 'data': [['**Multiple statements 
executed successfully.'**]], 'code': '090001', 'statementHandles': 
['01ba52bf-0103-fe23-0000-8c0d3c1dcb42', 
'01ba52bf-0103-fe23-0000-8c0d3c1dcb4a'], 'statementStatusUrl': 
'/api/v2/statements/01ba52bf-0103-fe23-0000-8c0d3c1dcb3e?requestId=a85631a6-fa5d-4a0e-b4ca-e9800ac36191',
 'requestId': 'a85631a6-fa5d-4a0e-b4ca-e9800ac36191', 'sqlState': '00000', 
'statementHandle': '01ba52bf-0103-fe23-0000-8c0d3c1dcb3e', 'message': 
'Statement executed successfully.', 'createdOn': 1739285703540}
   [2025-02-11, 14:55:55 UTC] {taskinstance.py:340} ▼ Post task execution logs
   [2025-02-11, 14:55:55 UTC] {taskinstance.py:352} INFO - Marking task as 
SUCCESS.
   
   At the end I want to say that of course for long-running jobs, we should use 
the deferable flag, but if the non-deferable option remains available, its 
logic should still function correctly
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to