This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1f13f261447 Remove returns in final clause of athena hooks (#43426)
1f13f261447 is described below
commit 1f13f261447b9c5239b86d706d7c4f715a644395
Author: yangyulely <[email protected]>
AuthorDate: Mon Oct 28 18:43:16 2024 +0800
Remove returns in final clause of athena hooks (#43426)
---
.../airflow/providers/amazon/aws/hooks/athena.py | 40 ++++++++++++++--------
providers/tests/amazon/aws/hooks/test_athena.py | 21 ++++++++++++
2 files changed, 46 insertions(+), 15 deletions(-)
diff --git a/providers/src/airflow/providers/amazon/aws/hooks/athena.py
b/providers/src/airflow/providers/amazon/aws/hooks/athena.py
index 4969f339dba..60405a86399 100644
--- a/providers/src/airflow/providers/amazon/aws/hooks/athena.py
+++ b/providers/src/airflow/providers/amazon/aws/hooks/athena.py
@@ -155,14 +155,15 @@ class AthenaHook(AwsBaseHook):
state = None
try:
state = response["QueryExecution"]["Status"]["State"]
- except Exception:
- self.log.exception(
- "Exception while getting query state. Query execution id: %s",
query_execution_id
- )
- finally:
+ except Exception as e:
# The error is being absorbed here and is being handled by the
caller.
# The error is being absorbed to implement retries.
- return state
+ self.log.exception(
+ "Exception while getting query state. Query execution id: %s,
Exception: %s",
+ query_execution_id,
+ e,
+ )
+ return state
def get_state_change_reason(self, query_execution_id: str, use_cache: bool
= False) -> str | None:
"""
@@ -177,15 +178,15 @@ class AthenaHook(AwsBaseHook):
reason = None
try:
reason = response["QueryExecution"]["Status"]["StateChangeReason"]
- except Exception:
+ except Exception as e:
+ # The error is being absorbed here and is being handled by the
caller.
+ # The error is being absorbed to implement retries.
self.log.exception(
- "Exception while getting query state change reason. Query
execution id: %s",
+ "Exception while getting query state change reason. Query
execution id: %s, Exception: %s",
query_execution_id,
+ e,
)
- finally:
- # The error is being absorbed here and is being handled by the
caller.
- # The error is being absorbed to implement retries.
- return reason
+ return reason
def get_query_results(
self, query_execution_id: str, next_token_id: str | None = None,
max_results: int = 1000
@@ -287,9 +288,18 @@ class AthenaHook(AwsBaseHook):
)
except AirflowException as error:
# this function does not raise errors to keep previous behavior.
- self.log.warning(error)
- finally:
- return self.check_query_status(query_execution_id)
+ self.log.warning(
+ "AirflowException while polling query status. Query execution
id: %s, Exception: %s",
+ query_execution_id,
+ error,
+ )
+ except Exception as e:
+ self.log.warning(
+ "Unexpected exception while polling query status. Query
execution id: %s, Exception: %s",
+ query_execution_id,
+ e,
+ )
+ return self.check_query_status(query_execution_id)
def get_output_location(self, query_execution_id: str) -> str:
"""
diff --git a/providers/tests/amazon/aws/hooks/test_athena.py
b/providers/tests/amazon/aws/hooks/test_athena.py
index 3262bb473a0..5e75cf3d09b 100644
--- a/providers/tests/amazon/aws/hooks/test_athena.py
+++ b/providers/tests/amazon/aws/hooks/test_athena.py
@@ -201,6 +201,15 @@ class TestAthenaHook:
mock_conn.return_value.get_query_execution.assert_called_once()
assert result == "RUNNING"
+ @mock.patch.object(AthenaHook, "get_conn")
+ def test_hook_poll_query_with_exception(self, mock_conn):
+ mock_conn.return_value.get_query_execution.return_value =
MOCK_QUERY_EXECUTION_OUTPUT
+ result = self.athena.poll_query_status(
+ query_execution_id=MOCK_DATA["query_execution_id"],
max_polling_attempts=1, sleep_time=0
+ )
+ mock_conn.return_value.get_query_execution.assert_called_once()
+ assert not result
+
@mock.patch.object(AthenaHook, "get_conn")
def test_hook_get_output_location(self, mock_conn):
mock_conn.return_value.get_query_execution.return_value =
MOCK_QUERY_EXECUTION_OUTPUT
@@ -230,6 +239,18 @@ class TestAthenaHook:
self.athena.get_output_location(query_execution_id="PLACEHOLDER")
assert "Error retrieving OutputLocation" in caplog.text
+ @mock.patch.object(AthenaHook, "get_query_info")
+ def test_check_query_status_normal(self, mock_get_query_info):
+ mock_get_query_info.return_value = MOCK_SUCCEEDED_QUERY_EXECUTION
+ state =
self.athena.check_query_status(query_execution_id=MOCK_DATA["query_execution_id"])
+ assert state == "SUCCEEDED"
+
+ @mock.patch.object(AthenaHook, "get_query_info")
+ def test_check_query_status_exception(self, mock_get_query_info):
+ mock_get_query_info.return_value = MOCK_QUERY_EXECUTION_OUTPUT
+ state =
self.athena.check_query_status(query_execution_id=MOCK_DATA["query_execution_id"])
+ assert not state
+
@mock.patch.object(AthenaHook, "get_conn")
def test_hook_get_query_info_caching(self, mock_conn):
mock_conn.return_value.get_query_execution.return_value =
MOCK_QUERY_EXECUTION_OUTPUT