This is an automated email from the ASF dual-hosted git repository. elizabeth pushed a commit to branch elizabeth/slackv2 in repository https://gitbox.apache.org/repos/asf/superset.git
commit 745b75db665b9f2e03d6ce070d885ca80326f49f Author: Elizabeth Thompson <[email protected]> AuthorDate: Thu Jun 13 09:39:03 2024 -0700 add slackv2 notification --- .../superset-ui-core/src/utils/featureFlags.ts | 1 + .../src/features/alerts/AlertReportModal.test.tsx | 2 +- .../alerts/components/NotificationMethod.tsx | 163 ++++++++++++++++++--- .../features/alerts/components/RecipientIcon.tsx | 4 + superset-frontend/src/features/alerts/types.ts | 3 +- superset/commands/report/execute.py | 68 ++++++++- superset/config.py | 1 + superset/reports/api.py | 67 ++++++++- superset/reports/models.py | 1 + superset/reports/notifications/__init__.py | 7 + superset/reports/notifications/exceptions.py | 8 + superset/reports/notifications/slack.py | 132 +++++------------ superset/reports/notifications/slack_mixin.py | 116 +++++++++++++++ superset/reports/notifications/slackv2.py | 133 +++++++++++++++++ superset/reports/schemas.py | 6 + superset/tasks/cron_util.py | 9 +- superset/tasks/scheduler.py | 36 +++-- superset/utils/slack.py | 56 +++++++ superset/views/base.py | 1 + 19 files changed, 673 insertions(+), 141 deletions(-) diff --git a/superset-frontend/packages/superset-ui-core/src/utils/featureFlags.ts b/superset-frontend/packages/superset-ui-core/src/utils/featureFlags.ts index b3af431d52..8ffc4f845d 100644 --- a/superset-frontend/packages/superset-ui-core/src/utils/featureFlags.ts +++ b/superset-frontend/packages/superset-ui-core/src/utils/featureFlags.ts @@ -25,6 +25,7 @@ export enum FeatureFlag { AlertsAttachReports = 'ALERTS_ATTACH_REPORTS', AlertReports = 'ALERT_REPORTS', AlertReportTabs = 'ALERT_REPORT_TABS', + AlertReportSlackV2 = 'ALERT_REPORT_SLACK_V2', AllowFullCsvExport = 'ALLOW_FULL_CSV_EXPORT', AvoidColorsCollision = 'AVOID_COLORS_COLLISION', ChartPluginsExperimental = 'CHART_PLUGINS_EXPERIMENTAL', diff --git a/superset-frontend/src/features/alerts/AlertReportModal.test.tsx b/superset-frontend/src/features/alerts/AlertReportModal.test.tsx index e32d13ab63..17047b7a40 100644 --- a/superset-frontend/src/features/alerts/AlertReportModal.test.tsx +++ b/superset-frontend/src/features/alerts/AlertReportModal.test.tsx @@ -30,7 +30,7 @@ jest.mock('@superset-ui/core', () => ({ jest.mock('src/features/databases/state.ts', () => ({ useCommonConf: () => ({ - ALERT_REPORTS_NOTIFICATION_METHODS: ['Email', 'Slack'], + ALERT_REPORTS_NOTIFICATION_METHODS: ['Email', 'Slack', 'SlackV2'], }), })); diff --git a/superset-frontend/src/features/alerts/components/NotificationMethod.tsx b/superset-frontend/src/features/alerts/components/NotificationMethod.tsx index b2d780423b..8949a83115 100644 --- a/superset-frontend/src/features/alerts/components/NotificationMethod.tsx +++ b/superset-frontend/src/features/alerts/components/NotificationMethod.tsx @@ -16,13 +16,29 @@ * specific language governing permissions and limitations * under the License. */ -import { FunctionComponent, useState, ChangeEvent } from 'react'; +import { + FunctionComponent, + useState, + ChangeEvent, + useEffect, + useMemo, +} from 'react'; -import { styled, t, useTheme } from '@superset-ui/core'; -import { Select } from 'src/components'; +import { + FeatureFlag, + SupersetClient, + isFeatureEnabled, + styled, + t, + useTheme, +} from '@superset-ui/core'; +import rison from 'rison'; +import { AsyncSelect, Select } from 'src/components'; import Icons from 'src/components/Icons'; import { NotificationMethodOption, NotificationSetting } from '../types'; import { StyledInputContainer } from '../AlertReportModal'; +import { set } from 'lodash'; +import { option } from 'yargs'; const StyledNotificationMethod = styled.div` margin-bottom: 10px; @@ -87,20 +103,92 @@ export const NotificationMethod: FunctionComponent<NotificationMethodProps> = ({ const [recipientValue, setRecipientValue] = useState<string>( recipients || '', ); + const [slackRecipients, setSlackRecipients] = useState< + { label: string; value: string }[] + >([]); const [error, setError] = useState(false); const theme = useTheme(); + const [useSlackV1, setUseSlackV1] = useState<boolean>(false); + + const mapChannelsToOptions = (result: { name: any; id: any }[]) => + result.map((result: { name: any; id: any }) => ({ + label: result.name, + value: result.id, + })); + + const loadChannels = async ( + search_string: string | undefined = '', + ): Promise<{ + data?: { label: any; value: any }[]; + totalCount?: number; + }> => { + const query = rison.encode({ search_string }); + const endpoint = `/api/v1/report/slack_channels?q=${query}`; + return SupersetClient.get({ endpoint }) + .then(({ json }) => { + const { result, count } = json; + + const options: { label: any; value: any }[] = + mapChannelsToOptions(result); + + return { + data: options, + totalCount: (count ?? options.length) as number, + }; + }) + .catch(() => { + // Fallback to slack v1 if slack v2 is not compatible + setUseSlackV1(true); + return {}; + }); + }; + + useEffect(() => { + // fetch slack channel names from + // ids on first load + if (method && ['Slack', 'SlackV2'].includes(method)) { + loadChannels(recipients).then(response => { + setSlackRecipients(response.data || []); + onMethodChange({ label: 'Slack', value: 'SlackV2' }); + }); + } + }, []); + + const formattedOptions = useMemo( + () => + (options || []) + .filter( + method => + (isFeatureEnabled(FeatureFlag.AlertReportSlackV2) && + !useSlackV1 && + method === 'SlackV2') || + ((!isFeatureEnabled(FeatureFlag.AlertReportSlackV2) || + useSlackV1) && + method === 'Slack') || + method === 'Email', + ) + .map(method => ({ + label: method === 'SlackV2' ? 'Slack' : method, + value: method, + })), + [options], + ); + if (!setting) { return null; } - const onMethodChange = (method: NotificationMethodOption) => { + const onMethodChange = (selected: { + label: string; + value: NotificationMethodOption; + }) => { // Since we're swapping the method, reset the recipients setRecipientValue(''); if (onUpdate) { const updatedSetting = { ...setting, - method, + method: selected.value, recipients: '', }; @@ -123,6 +211,21 @@ export const NotificationMethod: FunctionComponent<NotificationMethodProps> = ({ } }; + const onSlackRecipientsChange = ( + recipients: { label: string; value: string }[], + ) => { + setSlackRecipients(recipients); + + if (onUpdate) { + const updatedSetting = { + ...setting, + recipients: recipients?.map(obj => obj.value).join(','), + }; + + onUpdate(index, updatedSetting); + } + }; + const onSubjectChange = ( event: ChangeEvent<HTMLTextAreaElement | HTMLInputElement>, ) => { @@ -153,15 +256,12 @@ export const NotificationMethod: FunctionComponent<NotificationMethodProps> = ({ <Select ariaLabel={t('Delivery method')} data-test="select-delivery-method" + labelInValue onChange={onMethodChange} placeholder={t('Select Delivery Method')} - options={(options || []).map( - (method: NotificationMethodOption) => ({ - label: method, - value: method, - }), - )} - value={method} + options={formattedOptions} + showSearch + value={formattedOptions.find(option => option.value === method)} /> {index !== 0 && !!onRemove ? ( <span @@ -211,19 +311,36 @@ export const NotificationMethod: FunctionComponent<NotificationMethodProps> = ({ <div className="inline-container"> <StyledInputContainer> <div className="control-label"> - {t('%s recipients', method)} + {t('%s recipients', method === 'SlackV2' ? 'Slack' : method)} <span className="required">*</span> </div> - <div className="input-container"> - <textarea - name="recipients" - data-test="recipients" - value={recipientValue} - onChange={onRecipientsChange} - /> - </div> - <div className="helper"> - {t('Recipients are separated by "," or ";"')} + <div> + {['Email', 'Slack'].includes(method) ? ( + <> + <div className="input-container"> + <textarea + name="recipients" + data-test="recipients" + value={recipientValue} + onChange={onRecipientsChange} + /> + </div> + <div className="helper"> + {t('Recipients are separated by "," or ";"')} + </div> + </> + ) : ( + // for SlackV2 + <AsyncSelect + ariaLabel={t('Select owners')} + mode="multiple" + name="owners" + value={slackRecipients} + options={loadChannels} + onChange={onSlackRecipientsChange} + allowClear + /> + )} </div> </StyledInputContainer> </div> diff --git a/superset-frontend/src/features/alerts/components/RecipientIcon.tsx b/superset-frontend/src/features/alerts/components/RecipientIcon.tsx index c1c1127cb9..bcaedf658d 100644 --- a/superset-frontend/src/features/alerts/components/RecipientIcon.tsx +++ b/superset-frontend/src/features/alerts/components/RecipientIcon.tsx @@ -41,6 +41,10 @@ export default function RecipientIcon({ type }: { type: string }) { recipientIconConfig.icon = <Icons.Slack css={StyledIcon} />; recipientIconConfig.label = RecipientIconName.Slack; break; + case RecipientIconName.SlackV2: + recipientIconConfig.icon = <Icons.Slack css={StyledIcon} />; + recipientIconConfig.label = RecipientIconName.Slack; + break; default: recipientIconConfig.icon = null; recipientIconConfig.label = ''; diff --git a/superset-frontend/src/features/alerts/types.ts b/superset-frontend/src/features/alerts/types.ts index 932a744663..8faf7c4af6 100644 --- a/superset-frontend/src/features/alerts/types.ts +++ b/superset-frontend/src/features/alerts/types.ts @@ -41,7 +41,7 @@ export type DatabaseObject = { id: number; }; -export type NotificationMethodOption = 'Email' | 'Slack'; +export type NotificationMethodOption = 'Email' | 'Slack' | 'SlackV2'; export type NotificationSetting = { method?: NotificationMethodOption; @@ -124,6 +124,7 @@ export enum AlertState { export enum RecipientIconName { Email = 'Email', Slack = 'Slack', + SlackV2 = 'SlackV2', } export interface AlertsReportsConfig { ALERT_REPORTS_DEFAULT_WORKING_TIMEOUT: number; diff --git a/superset/commands/report/execute.py b/superset/commands/report/execute.py index 1540fa70d8..72599c9a05 100644 --- a/superset/commands/report/execute.py +++ b/superset/commands/report/execute.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. import logging +from copy import deepcopy from datetime import datetime, timedelta from typing import Any, Optional, Union from uuid import UUID @@ -25,7 +26,7 @@ from celery.exceptions import SoftTimeLimitExceeded from superset import app, db, security_manager from superset.commands.base import BaseCommand from superset.commands.dashboard.permalink.create import CreateDashboardPermalinkCommand -from superset.commands.exceptions import CommandException +from superset.commands.exceptions import CommandException, UpdateFailedError from superset.commands.report.alert import AlertCommand from superset.commands.report.exceptions import ( ReportScheduleAlertGracePeriodError, @@ -64,7 +65,10 @@ from superset.reports.models import ( ) from superset.reports.notifications import create_notification from superset.reports.notifications.base import NotificationContent -from superset.reports.notifications.exceptions import NotificationError +from superset.reports.notifications.exceptions import ( + NotificationError, + SlackV1NotificationError, +) from superset.tasks.utils import get_executor from superset.utils import json from superset.utils.core import HeaderDataType, override_user @@ -72,6 +76,7 @@ from superset.utils.csv import get_chart_csv_data, get_chart_dataframe from superset.utils.decorators import logs_context from superset.utils.pdf import build_pdf_from_screenshots from superset.utils.screenshots import ChartScreenshot, DashboardScreenshot +from superset.utils.slack import get_paginated_channels_with_search from superset.utils.urls import get_url_path logger = logging.getLogger(__name__) @@ -122,6 +127,36 @@ class BaseReportState: self._report_schedule.last_eval_dttm = datetime.utcnow() db.session.commit() + def update_report_schedule_slack_v2(self) -> None: + """ + Update the report schedule type and channels for all slack recipients to v2. + V2 uses ids instead of names for channels. + """ + try: + updated_recipients = [] + for recipient in self._report_schedule.recipients: + recipient_copy = deepcopy(recipient) + if recipient_copy.type == ReportRecipientType.SLACK: + recipient_copy.type = ReportRecipientType.SLACKV2 + slack_recipients = json.loads(recipient_copy.recipient_config_json) + recipient_copy.recipient_config_json = json.dumps( + { + "target": get_paginated_channels_with_search( + slack_recipients["target"] + ) + } + ) + + updated_recipients.append(recipient_copy) + logger.warning("recipient_copy is: %s", recipient_copy) + db.session.commit() + logger.warning("recipients updated to v2: %s", updated_recipients) + except Exception as ex: + logger.warning( + "Failed to update slack recipients to v2: %s", str(ex), exc_info=1 + ) + raise UpdateFailedError from ex + def create_log(self, error_message: Optional[str] = None) -> None: """ Creates a Report execution log, uses the current computed last_value for Alerts @@ -440,6 +475,28 @@ class BaseReportState: ) else: notification.send() + except SlackV1NotificationError as ex: + # The slack api was tested and it failed + # The slack notification should be sent with the v2 api + logger.warning( + "Attempting to upgrade the report to Slackv2: %s", str(ex) + ) + try: + self.update_report_schedule_slack_v2() + recipient.type = ReportRecipientType.SLACKV2 + notification = create_notification(recipient, notification_content) + notification.send() + except UpdateFailedError as ex: + # log the error but keep processing + # it will keep trying as it runs + # and then during a majore version we will run a migration and update it again + # and remove the old slack version code + logger.warning( + "Failed to update slack recipients to v2: %s", str(ex) + ) + except (NotificationError, SupersetException) as ex: + # if the send command errors, catch it in the next block + raise ex except (NotificationError, SupersetException) as ex: # collect errors but keep processing them notification_errors.append( @@ -478,9 +535,10 @@ class BaseReportState: """ header_data = self._get_log_data() logger.info( - "header_data in notifications for alerts and reports %s, taskid, %s", + "header_data in notifications for alerts and reports %s, taskid, %s, recipients %s", header_data, self._execution_id, + json.loads(self._report_schedule.recipients.recipient_config_json), ) notification_content = NotificationContent( name=name, text=message, header_data=header_data @@ -750,7 +808,9 @@ class AsyncExecuteReportScheduleCommand(BaseCommand): self._execution_id, ) self._model = ( - db.session.query(ReportSchedule).filter_by(id=self._model_id).one_or_none() + db.session.query(ReportSchedule) + .filter_by(id=self._model_id[0]) + .one_or_none() ) if not self._model: raise ReportScheduleNotFoundError() diff --git a/superset/config.py b/superset/config.py index cb77982994..1178a30260 100644 --- a/superset/config.py +++ b/superset/config.py @@ -444,6 +444,7 @@ DEFAULT_FEATURE_FLAGS: dict[str, bool] = { # Enables Alerts and reports new implementation "ALERT_REPORTS": False, "ALERT_REPORT_TABS": False, + "ALERT_REPORTS_SLACK_V2": False, "DASHBOARD_RBAC": False, "ENABLE_ADVANCED_DATA_TYPES": False, # Enabling ALERTS_ATTACH_REPORTS, the system sends email and slack message diff --git a/superset/reports/api.py b/superset/reports/api.py index 4a298b564d..04199240fa 100644 --- a/superset/reports/api.py +++ b/superset/reports/api.py @@ -40,15 +40,18 @@ from superset.commands.report.update import UpdateReportScheduleCommand from superset.constants import MODEL_API_RW_METHOD_PERMISSION_MAP, RouteMethod from superset.dashboards.filters import DashboardAccessFilter from superset.databases.filters import DatabaseFilter +from superset.exceptions import SupersetException from superset.extensions import event_logger from superset.reports.filters import ReportScheduleAllTextFilter, ReportScheduleFilter from superset.reports.models import ReportSchedule from superset.reports.schemas import ( get_delete_ids_schema, + get_slack_channels_schema, openapi_spec_methods_override, ReportSchedulePostSchema, ReportSchedulePutSchema, ) +from superset.utils.slack import get_paginated_channels_with_search from superset.views.base_api import ( BaseSupersetModelRestApi, RelatedFieldFilter, @@ -71,7 +74,8 @@ class ReportScheduleRestApi(BaseSupersetModelRestApi): include_route_methods = RouteMethod.REST_MODEL_VIEW_CRUD_SET | { RouteMethod.RELATED, - "bulk_delete", # not using RouteMethod since locally defined + "bulk_delete", + "slack_channels", # not using RouteMethod since locally defined } class_permission_name = "ReportSchedule" method_permission_name = MODEL_API_RW_METHOD_PERMISSION_MAP @@ -513,3 +517,64 @@ class ReportScheduleRestApi(BaseSupersetModelRestApi): return self.response_403() except ReportScheduleDeleteFailedError as ex: return self.response_422(message=str(ex)) + + @expose("/slack_channels/", methods=("GET",)) + @protect() + @rison(get_slack_channels_schema) + @permission_name("post") + @safe + @statsd_metrics + @event_logger.log_this_with_context( + action=lambda self, + *args, + **kwargs: f"{self.__class__.__name__}.slack_channels", + log_to_statsd=False, + ) + def slack_channels(self, **kwargs) -> Response: + """Get slack channels. + --- + get: + summary: Get slack channels + description: Get slack channels + parameters: + - in: query + name: q + content: + application/json: + schema: + $ref: '#/components/schemas/get_slack_channels_schema' + responses: + 200: + description: Slack channels + content: + application/json: + schema: + type: object + properties: + result: + type: array + items: + type: object + properties: + id: + type: string + name: + type: string + 401: + $ref: '#/components/responses/401' + 403: + $ref: '#/components/responses/403' + 404: + $ref: '#/components/responses/404' + 422: + $ref: '#/components/responses/422' + 500: + $ref: '#/components/responses/500' + """ + try: + search_string = kwargs.get("rison", {}).get("search_string") + channels = get_paginated_channels_with_search(search_string=search_string) + return self.response(200, result=channels) + except SupersetException as ex: + logger.error("Error fetching slack channels %s", str(ex)) + return self.response_422(message=str(ex)) diff --git a/superset/reports/models.py b/superset/reports/models.py index 3627a2ebf4..e4cdd7c9b4 100644 --- a/superset/reports/models.py +++ b/superset/reports/models.py @@ -62,6 +62,7 @@ class ReportScheduleValidatorType(StrEnum): class ReportRecipientType(StrEnum): EMAIL = "Email" SLACK = "Slack" + SLACKV2 = "SlackV2" class ReportState(StrEnum): diff --git a/superset/reports/notifications/__init__.py b/superset/reports/notifications/__init__.py index 770ce43e21..9de8f0bb09 100644 --- a/superset/reports/notifications/__init__.py +++ b/superset/reports/notifications/__init__.py @@ -14,10 +14,15 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import logging + from superset.reports.models import ReportRecipients from superset.reports.notifications.base import BaseNotification, NotificationContent from superset.reports.notifications.email import EmailNotification # noqa: F401 from superset.reports.notifications.slack import SlackNotification # noqa: F401 +from superset.reports.notifications.slackv2 import SlackV2Notification # noqa: F401 + +logger = logging.getLogger(__name__) def create_notification( @@ -28,6 +33,8 @@ def create_notification( Returns the Notification class for the recipient type """ for plugin in BaseNotification.plugins: + logger.warning(f"plugin.type: {plugin.type}") + logger.warning(f"recipient.type: {recipient.type}") if plugin.type == recipient.type: return plugin(recipient, notification_content) raise Exception( # pylint: disable=broad-exception-raised diff --git a/superset/reports/notifications/exceptions.py b/superset/reports/notifications/exceptions.py index aa06906f82..9cf87ed71e 100644 --- a/superset/reports/notifications/exceptions.py +++ b/superset/reports/notifications/exceptions.py @@ -24,6 +24,14 @@ class NotificationError(SupersetException): """ +class SlackV1NotificationError(SupersetException): + """ + Report should not be run with the slack v1 api + """ + + status = 422 + + class NotificationParamException(SupersetException): status = 422 diff --git a/superset/reports/notifications/slack.py b/superset/reports/notifications/slack.py index 6a6bbda7ba..258705bb29 100644 --- a/superset/reports/notifications/slack.py +++ b/superset/reports/notifications/slack.py @@ -17,14 +17,12 @@ import logging from collections.abc import Sequence from io import IOBase -from typing import List, Union +from typing import Union import backoff import pandas as pd -from deprecation import deprecated from flask import g from flask_babel import gettext as __ -from slack_sdk import WebClient from slack_sdk.errors import ( BotUserAccessError, SlackApiError, @@ -43,11 +41,15 @@ from superset.reports.notifications.exceptions import ( NotificationMalformedException, NotificationParamException, NotificationUnprocessableException, + SlackV1NotificationError, ) from superset.utils import json from superset.utils.core import get_email_address_list from superset.utils.decorators import statsd_gauge -from superset.utils.slack import get_slack_client +from superset.utils.slack import ( + get_slack_client, + should_use_v2_api, +) logger = logging.getLogger(__name__) @@ -62,25 +64,16 @@ class SlackNotification(BaseNotification): # pylint: disable=too-few-public-met type = ReportRecipientType.SLACK - def _get_channels(self, client: WebClient) -> List[str]: + def _get_channel(self) -> str: """ Get the recipient's channel(s). - :returns: A list of channel ids: "EID676L" - :raises SlackApiError: If the API call fails + Note Slack SDK uses "channel" to refer to one or more + channels. Multiple channels are demarcated by a comma. + :returns: The comma separated list of channel(s) """ recipient_str = json.loads(self._recipient.recipient_config_json)["target"] - channel_recipients: List[str] = get_email_address_list(recipient_str) - - conversations_list_response = client.conversations_list( - types="public_channel,private_channel" - ) - - return [ - c["id"] - for c in conversations_list_response["channels"] - if c["name"] in channel_recipients - ] + return ",".join(get_email_address_list(recipient_str)) def _message_template(self, table: str = "") -> str: return __( @@ -126,19 +119,15 @@ Error: %(text)s # Flatten columns/index so they show up nicely in the table df.columns = [ - ( - " ".join(str(name) for name in column).strip() - if isinstance(column, tuple) - else column - ) + " ".join(str(name) for name in column).strip() + if isinstance(column, tuple) + else column for column in df.columns ] df.index = [ - ( - " ".join(str(name) for name in index).strip() - if isinstance(index, tuple) - else index - ) + " ".join(str(name) for name in index).strip() + if isinstance(index, tuple) + else index for index in df.index ] @@ -177,38 +166,33 @@ Error: %(text)s def _get_inline_files( self, - ) -> Sequence[Union[str, IOBase, bytes]]: - if self._content.csv: - return [self._content.csv] - if self._content.screenshots: - return self._content.screenshots - if self._content.pdf: - return [self._content.pdf] - return [] - - @deprecated(deprecated_in="4.1") - def _deprecated_upload_files( - self, client: WebClient, title: str, body: str - ) -> None: - """ - Deprecated method to upload files to slack - Should only be used if the new method fails - To be removed in the next major release - """ - file_type, files = (None, []) + ) -> tuple[Union[str, None], Sequence[Union[str, IOBase, bytes]]]: if self._content.csv: - file_type, files = ("csv", [self._content.csv]) + return ("csv", [self._content.csv]) if self._content.screenshots: - file_type, files = ("png", self._content.screenshots) + return ("png", self._content.screenshots) if self._content.pdf: - file_type, files = ("pdf", [self._content.pdf]) + return ("pdf", [self._content.pdf]) + return (None, []) - recipient_str = json.loads(self._recipient.recipient_config_json)["target"] + @backoff.on_exception(backoff.expo, SlackApiError, factor=10, base=2, max_tries=5) + @statsd_gauge("reports.slack.send") + def send(self) -> None: + file_type, files = self._get_inline_files() + title = self._content.name + channel = self._get_channel() + body = self._get_body() + global_logs_context = getattr(g, "logs_context", {}) or {} - recipients = get_email_address_list(recipient_str) + # see if the v2 api will work + if should_use_v2_api(): + # if we can fetch channels, then raise an error and use the v2 api + raise SlackV1NotificationError - for channel in recipients: - if len(files) > 0: + try: + client = get_slack_client() + # files_upload returns SlackResponse as we run it in sync mode. + if files: for file in files: client.files_upload( channels=channel, @@ -219,46 +203,6 @@ Error: %(text)s ) else: client.chat_postMessage(channel=channel, text=body) - - @backoff.on_exception(backoff.expo, SlackApiError, factor=10, base=2, max_tries=5) - @statsd_gauge("reports.slack.send") - def send(self) -> None: - global_logs_context = getattr(g, "logs_context", {}) or {} - try: - client = get_slack_client() - title = self._content.name - body = self._get_body() - - try: - channels = self._get_channels(client) - except SlackApiError: - logger.warning( - "Slack scope missing. Using deprecated API to get channels. Please update your Slack app to use the new API.", - extra={ - "execution_id": global_logs_context.get("execution_id"), - }, - ) - self._deprecated_upload_files(client, title, body) - return - - if channels == []: - raise NotificationParamException("No valid channel found") - - files = self._get_inline_files() - - # files_upload returns SlackResponse as we run it in sync mode. - for channel in channels: - if len(files) > 0: - for file in files: - client.files_upload_v2( - channel=channel, - file=file, - initial_comment=body, - title=title, - ) - else: - client.chat_postMessage(channel=channel, text=body) - logger.info( "Report sent to slack", extra={ diff --git a/superset/reports/notifications/slack_mixin.py b/superset/reports/notifications/slack_mixin.py new file mode 100644 index 0000000000..aa6c4e1957 --- /dev/null +++ b/superset/reports/notifications/slack_mixin.py @@ -0,0 +1,116 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pandas as pd +from flask_babel import gettext as __ + +from superset.reports.notifications.slack import MAXIMUM_MESSAGE_SIZE + + +class SlackMixin: + def _message_template(self, table: str = "") -> str: + return __( + """*%(name)s* + + %(description)s + + <%(url)s|Explore in Superset> + + %(table)s + """, + name=self._content.name, + description=self._content.description or "", + url=self._content.url, + table=table, + ) + + @staticmethod + def _error_template(name: str, description: str, text: str) -> str: + return __( + """*%(name)s* + + %(description)s + + Error: %(text)s + """, + name=name, + description=description, + text=text, + ) + + def _get_body(self) -> str: + if self._content.text: + return self._error_template( + self._content.name, self._content.description or "", self._content.text + ) + + if self._content.embedded_data is None: + return self._message_template() + + # Embed data in the message + df = self._content.embedded_data + + # Flatten columns/index so they show up nicely in the table + df.columns = [ + ( + " ".join(str(name) for name in column).strip() + if isinstance(column, tuple) + else column + ) + for column in df.columns + ] + df.index = [ + ( + " ".join(str(name) for name in index).strip() + if isinstance(index, tuple) + else index + ) + for index in df.index + ] + + # Slack Markdown only works on messages shorter than 4k chars, so we might + # need to truncate the data + for i in range(len(df) - 1): + truncated_df = df[: i + 1].fillna("") + truncated_row = pd.Series({k: "..." for k in df.columns}) + truncated_df = pd.concat( + [truncated_df, truncated_row.to_frame().T], ignore_index=True + ) + tabulated = df.to_markdown() + table = f"```\n{tabulated}\n```\n\n(table was truncated)" + message = self._message_template(table) + if len(message) > MAXIMUM_MESSAGE_SIZE: + # Decrement i and build a message that is under the limit + truncated_df = df[:i].fillna("") + truncated_row = pd.Series({k: "..." for k in df.columns}) + truncated_df = pd.concat( + [truncated_df, truncated_row.to_frame().T], ignore_index=True + ) + tabulated = df.to_markdown() + table = ( + f"```\n{tabulated}\n```\n\n(table was truncated)" + if len(truncated_df) > 0 + else "" + ) + break + + # Send full data + else: + tabulated = df.to_markdown() + table = f"```\n{tabulated}\n```" + + return self._message_template(table) diff --git a/superset/reports/notifications/slackv2.py b/superset/reports/notifications/slackv2.py new file mode 100644 index 0000000000..644ef8056a --- /dev/null +++ b/superset/reports/notifications/slackv2.py @@ -0,0 +1,133 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging +from collections.abc import Sequence +from io import IOBase +from typing import List, Union + +import backoff +from flask import g +from slack_sdk.errors import ( + BotUserAccessError, + SlackApiError, + SlackClientConfigurationError, + SlackClientError, + SlackClientNotConnectedError, + SlackObjectFormationError, + SlackRequestError, + SlackTokenRotationError, +) + +from superset.reports.models import ReportRecipientType +from superset.reports.notifications.base import BaseNotification +from superset.reports.notifications.exceptions import ( + NotificationAuthorizationException, + NotificationMalformedException, + NotificationParamException, + NotificationUnprocessableException, +) +from superset.reports.notifications.slack_mixin import SlackMixin +from superset.utils import json +from superset.utils.core import get_email_address_list +from superset.utils.decorators import statsd_gauge +from superset.utils.slack import get_slack_client + +logger = logging.getLogger(__name__) + +# Slack only allows Markdown messages up to 4k chars +MAXIMUM_MESSAGE_SIZE = 4000 + + +class SlackV2Notification(BaseNotification, SlackMixin): # pylint: disable=too-few-public-methods + """ + Sends a slack notification for a report recipient with the slack upload v2 API + """ + + type = ReportRecipientType.SLACKV2 + + def _get_channels(self) -> List[str]: + """ + Get the recipient's channel(s). + :returns: A list of channel ids: "EID676L" + :raises NotificationParamException or SlackApiError: If the recipient is not found + """ + recipient_str = json.loads(self._recipient.recipient_config_json)["target"] + + return get_email_address_list(recipient_str) + + def _get_inline_files( + self, + ) -> Sequence[Union[str, IOBase, bytes]]: + if self._content.csv: + return [self._content.csv] + if self._content.screenshots: + return self._content.screenshots + if self._content.pdf: + return [self._content.pdf] + return [] + + @backoff.on_exception(backoff.expo, SlackApiError, factor=10, base=2, max_tries=5) + @statsd_gauge("reports.slack.send") + def send(self) -> None: + global_logs_context = getattr(g, "logs_context", {}) or {} + try: + client = get_slack_client() + title = self._content.name + body = self._get_body() + + channels = self._get_channels() + + if not channels: + raise NotificationParamException("No recipients saved in the report") + + files = self._get_inline_files() + + # files_upload returns SlackResponse as we run it in sync mode. + for channel in channels: + if len(files) > 0: + for file in files: + client.files_upload_v2( + channel=channel, + file=file, + initial_comment=body, + title=title, + ) + else: + client.chat_postMessage(channel=channel, text=body) + + logger.info( + "Report sent to slack", + extra={ + "execution_id": global_logs_context.get("execution_id"), + }, + ) + except ( + BotUserAccessError, + SlackRequestError, + SlackClientConfigurationError, + ) as ex: + raise NotificationParamException(str(ex)) from ex + except SlackObjectFormationError as ex: + raise NotificationMalformedException(str(ex)) from ex + except SlackTokenRotationError as ex: + raise NotificationAuthorizationException(str(ex)) from ex + except (SlackClientNotConnectedError, SlackApiError) as ex: + raise NotificationUnprocessableException(str(ex)) from ex + except SlackClientError as ex: + # this is the base class for all slack client errors + # keep it last so that it doesn't interfere with @backoff + raise NotificationUnprocessableException(str(ex)) from ex diff --git a/superset/reports/schemas.py b/superset/reports/schemas.py index f57a663e53..20cbff87b8 100644 --- a/superset/reports/schemas.py +++ b/superset/reports/schemas.py @@ -49,6 +49,12 @@ openapi_spec_methods_override = { } get_delete_ids_schema = {"type": "array", "items": {"type": "integer"}} +get_slack_channels_schema = { + "type": "object", + "properties": { + "seach_string": {"type": "string"}, + }, +} type_description = "The report schedule type" name_description = "The report schedule name." diff --git a/superset/tasks/cron_util.py b/superset/tasks/cron_util.py index 329937fb82..134cb573e0 100644 --- a/superset/tasks/cron_util.py +++ b/superset/tasks/cron_util.py @@ -33,6 +33,7 @@ def cron_schedule_window( window_size = app.config["ALERT_REPORTS_CRON_WINDOW_SIZE"] try: tz = pytz_timezone(timezone) + logger.info("Using tz %s with timezone %s", tz, timezone) except UnknownTimeZoneError: # fallback to default timezone tz = pytz_timezone("UTC") @@ -42,9 +43,15 @@ def cron_schedule_window( time_now = triggered_at.astimezone(tz) start_at = time_now - timedelta(seconds=window_size / 2) stop_at = time_now + timedelta(seconds=window_size / 2) + logger.info("Time now: %s, start at: %s, stop at: %s", time_now, start_at, stop_at) crons = croniter(cron, start_at) for schedule in crons.all_next(datetime): if schedule >= stop_at: break # convert schedule back to utc - yield schedule.astimezone(utc).replace(tzinfo=None) + logger.info( + "Yielding schedule: %s, converted: %s", + schedule, + schedule.astimezone(utc).replace(tzinfo=None), + ) + yield schedule diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index cb55dc9f69..11b6fd6618 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -44,6 +44,7 @@ def scheduler() -> None: stats_logger.incr("reports.scheduler") if not is_feature_enabled("ALERT_REPORTS"): + logger.info("Alert reports feature is not enabled. Exiting scheduler") return active_schedules = ReportScheduleDAO.find_active() triggered_at = ( @@ -57,31 +58,34 @@ def scheduler() -> None: triggered_at, active_schedule.crontab, active_schedule.timezone ): logger.info("Scheduling alert %s eta: %s", active_schedule.name, schedule) - async_options = {"eta": schedule} - if ( - active_schedule.working_timeout is not None - and app.config["ALERT_REPORTS_WORKING_TIME_OUT_KILL"] - ): - async_options["time_limit"] = ( - active_schedule.working_timeout - + app.config["ALERT_REPORTS_WORKING_TIME_OUT_LAG"] - ) - async_options["soft_time_limit"] = ( - active_schedule.working_timeout - + app.config["ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG"] - ) - execute.apply_async((active_schedule.id,), **async_options) + async_options = {} + # if ( + # active_schedule.working_timeout is not None + # and app.config["ALERT_REPORTS_WORKING_TIME_OUT_KILL"] + # ): + # async_options["time_limit"] = ( + # active_schedule.working_timeout + # + app.config["ALERT_REPORTS_WORKING_TIME_OUT_LAG"] + # ) + # async_options["soft_time_limit"] = ( + # active_schedule.working_timeout + # + app.config["ALERT_REPORTS_WORKING_SOFT_TIME_OUT_LAG"] + # ) + logger.info("GOING TO EXECUTE %s", active_schedule.id) + execute.delay( + (active_schedule.id,), scheduled_dttm=schedule, **async_options + ) @celery_app.task(name="reports.execute", bind=True) -def execute(self: Celery.task, report_schedule_id: int) -> None: +def execute(self: Celery.task, report_schedule_id: int, scheduled_dttm) -> None: + logger.info("RECEIVED TASK %s", report_schedule_id) stats_logger: BaseStatsLogger = app.config["STATS_LOGGER"] stats_logger.incr("reports.execute") task_id = None try: task_id = execute.request.id - scheduled_dttm = execute.request.eta logger.info( "Executing alert/report, task id: %s, scheduled_dttm: %s", task_id, diff --git a/superset/utils/slack.py b/superset/utils/slack.py index 8fa9013dfa..cc6db9077f 100644 --- a/superset/utils/slack.py +++ b/superset/utils/slack.py @@ -16,8 +16,15 @@ # under the License. +import logging + from flask import current_app from slack_sdk import WebClient +from slack_sdk.errors import SlackApiError + +from superset.exceptions import SupersetException + +logger = logging.getLogger(__name__) class SlackClientError(Exception): @@ -31,6 +38,55 @@ def get_slack_client() -> WebClient: return WebClient(token=token, proxy=current_app.config["SLACK_PROXY"]) +def get_paginated_channels_with_search( + search_string: str = "", limit: int = 200 +) -> list: + """ + The slack api is paginated but does not include search, so we need to fetch + all channels and filter them ourselves + This will search by slack name or id + """ + + try: + client = get_slack_client() + channels = [] + cursor = None + + while True: + response = client.conversations_list(limit=limit, cursor=cursor) + channels.extend(response.data["channels"]) + cursor = response.data.get("response_metadata", {}).get("next_cursor") + if not cursor: + break + + # The search string can be multiple channels separated by commas + if search_string: + search_array = search_string.split(",") + + channels = [ + channel + for channel in channels + if any( + search.lower() in channel["name"].lower() + or search.lower() in channel["id"].lower() + for search in search_array + ) + ] + return channels + except (SlackClientError, SlackApiError) as ex: + raise SupersetException(f"Failed to list channels: {ex}") from ex + + +def should_use_v2_api() -> bool: + try: + client = get_slack_client() + client.conversations_list() + logger.warning("Slack API v2 is available") + return True + except SlackApiError: + return False + + def get_user_avatar(email: str, client: WebClient = None) -> str: client = client or get_slack_client() try: diff --git a/superset/views/base.py b/superset/views/base.py index be3af99147..8969a9759d 100644 --- a/superset/views/base.py +++ b/superset/views/base.py @@ -402,6 +402,7 @@ def cached_common_bootstrap_data( # pylint: disable=unused-argument frontend_config["ALERT_REPORTS_NOTIFICATION_METHODS"] = [ ReportRecipientType.EMAIL, ReportRecipientType.SLACK, + ReportRecipientType.SLACKV2, ] else: frontend_config["ALERT_REPORTS_NOTIFICATION_METHODS"] = [
