This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d8381ed250 Update SqlToSlackApiFileOperator with new param to check 
empty output (#38079)
d8381ed250 is described below

commit d8381ed2508a6129142e5717b800bb4bad7a6a30
Author: Tianyou Gu <[email protected]>
AuthorDate: Mon Mar 18 10:49:54 2024 -0700

    Update SqlToSlackApiFileOperator with new param to check empty output 
(#38079)
    
    * fix: Update SqlToSlackApiFileOperator with new param to check empty output
    
    * fix: skip sending slack instead of raising exception
    
    * fix: update param to allow different ways to handle an empty df
    
    * Apply suggestions from code review
    
    fmt: make formatting changes
    
    Co-authored-by: Andrey Anshin <[email protected]>
    
    ---------
    
    Co-authored-by: Andrey Anshin <[email protected]>
---
 airflow/providers/slack/transfers/sql_to_slack.py  | 18 +++++-
 .../providers/slack/transfers/test_sql_to_slack.py | 69 +++++++++++++++++++++-
 2 files changed, 85 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/slack/transfers/sql_to_slack.py 
b/airflow/providers/slack/transfers/sql_to_slack.py
index 0ecfc4e8ca..cd19fa9a74 100644
--- a/airflow/providers/slack/transfers/sql_to_slack.py
+++ b/airflow/providers/slack/transfers/sql_to_slack.py
@@ -23,7 +23,7 @@ from typing import TYPE_CHECKING, Any, Mapping, Sequence
 from deprecated import deprecated
 from typing_extensions import Literal
 
-from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning, AirflowSkipException
 from airflow.providers.slack.hooks.slack import SlackHook
 from airflow.providers.slack.transfers.base_sql_to_slack import 
BaseSqlToSlackOperator
 from airflow.providers.slack.transfers.sql_to_slack_webhook import 
SqlToSlackWebhookOperator
@@ -58,6 +58,11 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
     :param slack_base_url: A string representing the Slack API base URL. 
Optional
     :param slack_method_version: The version of the Slack SDK Client method to 
be used, either "v1" or "v2".
     :param df_kwargs: Keyword arguments forwarded to 
``pandas.DataFrame.to_{format}()`` method.
+    :param action_on_empty_df: Specifying how to handle an empty sql output 
df. Possible values:
+
+        - ``send``: (default) send the slack with an empty file.
+        - ``skip``: skip sending the slack message. Task state set to 
"skipped".
+        - ``error``: raise an error to fail the task. Task state set to 
"failed".
     """
 
     template_fields: Sequence[str] = (
@@ -87,6 +92,7 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
         slack_base_url: str | None = None,
         slack_method_version: Literal["v1", "v2"] = "v1",
         df_kwargs: dict | None = None,
+        action_on_empty_df: Literal["send", "skip", "error"] = "send",
         **kwargs,
     ):
         super().__init__(
@@ -100,6 +106,9 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
         self.slack_base_url = slack_base_url
         self.slack_method_version = slack_method_version
         self.df_kwargs = df_kwargs or {}
+        if not action_on_empty_df or action_on_empty_df not in ("send", 
"skip", "error"):
+            raise ValueError(f"Invalid `action_on_empty_df` value 
{action_on_empty_df!r}")
+        self.action_on_empty_df = action_on_empty_df
 
     @cached_property
     def slack_hook(self):
@@ -134,6 +143,13 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
             output_file_name = fp.name
             output_file_format = output_file_format.upper()
             df_result = self._get_query_results()
+            if df_result.empty:
+                if self.action_on_empty_df == "skip":
+                    raise AirflowSkipException("SQL output df is empty. 
Skipping.")
+                elif self.action_on_empty_df == "error":
+                    raise ValueError("SQL output df must be non-empty. 
Failing.")
+                elif self.action_on_empty_df != "send":
+                    raise ValueError(f"Invalid `action_on_empty_df` value 
{self.action_on_empty_df!r}")
             if output_file_format == "CSV":
                 df_result.to_csv(output_file_name, **self.df_kwargs)
             elif output_file_format == "JSON":
diff --git a/tests/providers/slack/transfers/test_sql_to_slack.py 
b/tests/providers/slack/transfers/test_sql_to_slack.py
index fd3c06189d..8db15cae66 100644
--- a/tests/providers/slack/transfers/test_sql_to_slack.py
+++ b/tests/providers/slack/transfers/test_sql_to_slack.py
@@ -20,7 +20,7 @@ from unittest import mock
 
 import pytest
 
-from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowProviderDeprecationWarning, 
AirflowSkipException
 from airflow.providers.slack.transfers.sql_to_slack import 
SqlToSlackApiFileOperator, SqlToSlackOperator
 from airflow.utils import timezone
 
@@ -157,6 +157,73 @@ class TestSqlToSlackApiFileOperator:
         with pytest.raises(ValueError):
             op.execute(mock.MagicMock())
 
+    @mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
+    
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
+    def test_null_output_sending_empty_file_by_default(self, 
mock_get_query_results, mock_slack_hook_cls):
+        op_kwargs = {
+            **self.default_op_kwargs,
+            "slack_conn_id": "expected-test-slack-conn-id",
+            "slack_filename": "test_filename.csv",
+            "slack_channels": ["#random"],
+            "slack_initial_comment": "test_comment",
+            "slack_title": "test_title",
+        }
+        op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)
+
+        # Mock empty query results
+        mock_df = mock.MagicMock()
+        mock_df.configure_mock(**{"empty.return_value": True})
+        mock_get_query_results.return_value = mock_df
+
+        op.execute(mock.MagicMock)
+        mock_slack_hook_cls.assert_called_once()
+
+    @mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
+    
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
+    def test_null_output_skip_sending_file(self, mock_get_query_results, 
mock_slack_hook_cls):
+        op_kwargs = {
+            **self.default_op_kwargs,
+            "slack_conn_id": "expected-test-slack-conn-id",
+            "slack_filename": "test_filename.csv",
+            "slack_channels": ["#random"],
+            "slack_initial_comment": "test_comment",
+            "slack_title": "test_title",
+            "action_on_empty_df": "skip",
+        }
+        op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)
+
+        # Mock empty query results
+        mock_df = mock.MagicMock()
+        mock_df.configure_mock(**{"empty.return_value": True})
+        mock_get_query_results.return_value = mock_df
+
+        with pytest.raises(AirflowSkipException):
+            op.execute(mock.MagicMock())
+        mock_slack_hook_cls.assert_not_called()
+
+    @mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
+    
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
+    def test_null_output_raise_error(self, mock_get_query_results, 
mock_slack_hook_cls):
+        op_kwargs = {
+            **self.default_op_kwargs,
+            "slack_conn_id": "expected-test-slack-conn-id",
+            "slack_filename": "test_filename.csv",
+            "slack_channels": ["#random"],
+            "slack_initial_comment": "test_comment",
+            "slack_title": "test_title",
+            "action_on_empty_df": "error",
+        }
+        op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)
+
+        # Mock empty query results
+        mock_df = mock.MagicMock()
+        mock_df.configure_mock(**{"empty.return_value": True})
+        mock_get_query_results.return_value = mock_df
+
+        with pytest.raises(ValueError, match="output df must be non-empty\. 
Failing"):
+            op.execute(mock.MagicMock())
+        mock_slack_hook_cls.assert_not_called()
+
 
 def test_deprecated_sql_to_slack_operator():
     warning_pattern = "SqlToSlackOperator` has been renamed and moved"

Reply via email to