Taragolis commented on code in PR #38079:
URL: https://github.com/apache/airflow/pull/38079#discussion_r1525534431


##########
airflow/providers/slack/transfers/sql_to_slack.py:
##########
@@ -134,6 +143,15 @@ def execute(self, context: Context) -> None:
             output_file_name = fp.name
             output_file_format = output_file_format.upper()
             df_result = self._get_query_results()
+            if df_result.empty:
+                if self.action_on_empty_df == "skip":
+                    raise AirflowSkipException("Sql output df is empty. 
Skipping.")
+                elif self.action_on_empty_df == "error":
+                    raise ValueError("Sql output df must be non-empty. 
Failing.")
+                elif self.action_on_empty_df == "send":
+                    pass
+                else:

Review Comment:
   ```suggestion
                   elif self.action_on_empty_df != "send":
   ```



##########
tests/providers/slack/transfers/test_sql_to_slack.py:
##########
@@ -157,6 +157,73 @@ def test_unsupported_format(self, filename):
         with pytest.raises(ValueError):
             op.execute(mock.MagicMock())
 
+    @mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
+    
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
+    def test_null_output_sending_empty_file_by_default(self, 
mock_get_query_results, mock_slack_hook_cls):
+        op_kwargs = {
+            **self.default_op_kwargs,
+            "slack_conn_id": "expected-test-slack-conn-id",
+            "slack_filename": "test_filename.csv",
+            "slack_channels": ["#random"],
+            "slack_initial_comment": "test_comment",
+            "slack_title": "test_title",
+        }
+        op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)
+
+        # Mock empty query results
+        mock_df = mock.MagicMock()
+        mock_df.configure_mock(**{"empty.return_value": True})
+        mock_get_query_results.return_value = mock_df
+
+        op.execute(mock.MagicMock)
+        mock_slack_hook_cls.assert_called_once()
+
+    @mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
+    
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
+    def test_null_output_skip_sending_file(self, mock_get_query_results, 
mock_slack_hook_cls):
+        op_kwargs = {
+            **self.default_op_kwargs,
+            "slack_conn_id": "expected-test-slack-conn-id",
+            "slack_filename": "test_filename.csv",
+            "slack_channels": ["#random"],
+            "slack_initial_comment": "test_comment",
+            "slack_title": "test_title",
+            "action_on_empty_df": "skip",
+        }
+        op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)
+
+        # Mock empty query results
+        mock_df = mock.MagicMock()
+        mock_df.configure_mock(**{"empty.return_value": True})
+        mock_get_query_results.return_value = mock_df
+
+        with pytest.raises(AirflowSkipException):
+            op.execute(mock.MagicMock())
+            mock_slack_hook_cls.assert_not_called()

Review Comment:
   ```suggestion
               op.execute(mock.MagicMock())
           mock_slack_hook_cls.assert_not_called()
   ```



##########
airflow/providers/slack/transfers/sql_to_slack.py:
##########
@@ -134,6 +143,15 @@ def execute(self, context: Context) -> None:
             output_file_name = fp.name
             output_file_format = output_file_format.upper()
             df_result = self._get_query_results()
+            if df_result.empty:
+                if self.action_on_empty_df == "skip":
+                    raise AirflowSkipException("Sql output df is empty. 
Skipping.")
+                elif self.action_on_empty_df == "error":
+                    raise ValueError("Sql output df must be non-empty. 
Failing.")

Review Comment:
   ```suggestion
                       raise ValueError("SQL output df must be non-empty. 
Failing.")
   ```



##########
airflow/providers/slack/transfers/sql_to_slack.py:
##########
@@ -134,6 +143,15 @@ def execute(self, context: Context) -> None:
             output_file_name = fp.name
             output_file_format = output_file_format.upper()
             df_result = self._get_query_results()
+            if df_result.empty:
+                if self.action_on_empty_df == "skip":
+                    raise AirflowSkipException("Sql output df is empty. 
Skipping.")

Review Comment:
   ```suggestion
                       raise AirflowSkipException("SQL output df is empty. 
Skipping.")
   ```



##########
tests/providers/slack/transfers/test_sql_to_slack.py:
##########
@@ -157,6 +157,73 @@ def test_unsupported_format(self, filename):
         with pytest.raises(ValueError):
             op.execute(mock.MagicMock())
 
+    @mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
+    
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
+    def test_null_output_sending_empty_file_by_default(self, 
mock_get_query_results, mock_slack_hook_cls):
+        op_kwargs = {
+            **self.default_op_kwargs,
+            "slack_conn_id": "expected-test-slack-conn-id",
+            "slack_filename": "test_filename.csv",
+            "slack_channels": ["#random"],
+            "slack_initial_comment": "test_comment",
+            "slack_title": "test_title",
+        }
+        op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)
+
+        # Mock empty query results
+        mock_df = mock.MagicMock()
+        mock_df.configure_mock(**{"empty.return_value": True})
+        mock_get_query_results.return_value = mock_df
+
+        op.execute(mock.MagicMock)
+        mock_slack_hook_cls.assert_called_once()
+
+    @mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
+    
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
+    def test_null_output_skip_sending_file(self, mock_get_query_results, 
mock_slack_hook_cls):
+        op_kwargs = {
+            **self.default_op_kwargs,
+            "slack_conn_id": "expected-test-slack-conn-id",
+            "slack_filename": "test_filename.csv",
+            "slack_channels": ["#random"],
+            "slack_initial_comment": "test_comment",
+            "slack_title": "test_title",
+            "action_on_empty_df": "skip",
+        }
+        op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)
+
+        # Mock empty query results
+        mock_df = mock.MagicMock()
+        mock_df.configure_mock(**{"empty.return_value": True})
+        mock_get_query_results.return_value = mock_df
+
+        with pytest.raises(AirflowSkipException):
+            op.execute(mock.MagicMock())
+            mock_slack_hook_cls.assert_not_called()
+
+    @mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
+    
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
+    def test_null_output_raise_error(self, mock_get_query_results, 
mock_slack_hook_cls):
+        op_kwargs = {
+            **self.default_op_kwargs,
+            "slack_conn_id": "expected-test-slack-conn-id",
+            "slack_filename": "test_filename.csv",
+            "slack_channels": ["#random"],
+            "slack_initial_comment": "test_comment",
+            "slack_title": "test_title",
+            "action_on_empty_df": "error",
+        }
+        op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)
+
+        # Mock empty query results
+        mock_df = mock.MagicMock()
+        mock_df.configure_mock(**{"empty.return_value": True})
+        mock_get_query_results.return_value = mock_df
+
+        with pytest.raises(ValueError):

Review Comment:
   ```suggestion
           with pytest.raises(ValueError, match="output df must be non-empty\. 
Failing"):
   ```



##########
tests/providers/slack/transfers/test_sql_to_slack.py:
##########
@@ -157,6 +157,73 @@ def test_unsupported_format(self, filename):
         with pytest.raises(ValueError):
             op.execute(mock.MagicMock())
 
+    @mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
+    
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
+    def test_null_output_sending_empty_file_by_default(self, 
mock_get_query_results, mock_slack_hook_cls):
+        op_kwargs = {
+            **self.default_op_kwargs,
+            "slack_conn_id": "expected-test-slack-conn-id",
+            "slack_filename": "test_filename.csv",
+            "slack_channels": ["#random"],
+            "slack_initial_comment": "test_comment",
+            "slack_title": "test_title",
+        }
+        op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)
+
+        # Mock empty query results
+        mock_df = mock.MagicMock()
+        mock_df.configure_mock(**{"empty.return_value": True})
+        mock_get_query_results.return_value = mock_df
+
+        op.execute(mock.MagicMock)
+        mock_slack_hook_cls.assert_called_once()
+
+    @mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
+    
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
+    def test_null_output_skip_sending_file(self, mock_get_query_results, 
mock_slack_hook_cls):
+        op_kwargs = {
+            **self.default_op_kwargs,
+            "slack_conn_id": "expected-test-slack-conn-id",
+            "slack_filename": "test_filename.csv",
+            "slack_channels": ["#random"],
+            "slack_initial_comment": "test_comment",
+            "slack_title": "test_title",
+            "action_on_empty_df": "skip",
+        }
+        op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)
+
+        # Mock empty query results
+        mock_df = mock.MagicMock()
+        mock_df.configure_mock(**{"empty.return_value": True})
+        mock_get_query_results.return_value = mock_df
+
+        with pytest.raises(AirflowSkipException):
+            op.execute(mock.MagicMock())
+            mock_slack_hook_cls.assert_not_called()
+
+    @mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
+    
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
+    def test_null_output_raise_error(self, mock_get_query_results, 
mock_slack_hook_cls):
+        op_kwargs = {
+            **self.default_op_kwargs,
+            "slack_conn_id": "expected-test-slack-conn-id",
+            "slack_filename": "test_filename.csv",
+            "slack_channels": ["#random"],
+            "slack_initial_comment": "test_comment",
+            "slack_title": "test_title",
+            "action_on_empty_df": "error",
+        }
+        op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)
+
+        # Mock empty query results
+        mock_df = mock.MagicMock()
+        mock_df.configure_mock(**{"empty.return_value": True})
+        mock_get_query_results.return_value = mock_df
+
+        with pytest.raises(ValueError):
+            op.execute(mock.MagicMock())
+            mock_slack_hook_cls.assert_not_called()

Review Comment:
   ```suggestion
           mock_slack_hook_cls.assert_not_called()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to