AlexisBRENON opened a new issue #14880:
URL: https://github.com/apache/airflow/issues/14880
<!--
Welcome to Apache Airflow! For a smooth issue process, try to answer the
following questions.
Don't worry if they're not all applicable; just try to include what you can
:-)
If you need to include code snippets or logs, please put them in fenced code
blocks. If they're super-long, please use the details tag like
<details><summary>super-long log</summary> lots of stuff </details>
Please delete these comment blocks before submitting the issue.
-->
<!--
IMPORTANT!!!
PLEASE CHECK "SIMILAR TO X EXISTING ISSUES" OPTION IF VISIBLE
NEXT TO "SUBMIT NEW ISSUE" BUTTON!!!
PLEASE CHECK IF THIS ISSUE HAS BEEN REPORTED PREVIOUSLY USING SEARCH!!!
Please complete the next sections or the issue will be closed.
These questions are the first thing we need to know to understand the
context.
-->
**Apache Airflow version**: 2.0.1
**Environment**: Docker
- **Cloud provider or hardware configuration**: Local file system
- **OS** (e.g. from /etc/os-release): Arch Linux
- **Kernel** (e.g. `uname -a`): 5.11.5-arch1-1
**What happened**:
I tried to post a file from a long Python string to a Slack channel through
the SlackAPIFileOperator.
I defined the operator this way:
```
SlackAPIFileOperator(
task_id="{}-notifier".format(self.task_id),
channel="#alerts-metrics",
token=MY_TOKEN,
initial_comment=":warning: alert",
filename="{{ ds }}.csv",
filetype="csv",
content=df.to_csv()
)
```
Task failed with the following error:
```
DEBUG - Sending a request - url: https://www.slack.com/api/files.upload,
query_params: {}, body_params: {}, files: {}, json_body: {'channels':
'#alerts-metrics', 'content': '<a long pandas.DataFrame.to_csv output>',
'filename': '{{ ds }}.csv', 'filetype': 'csv', 'initial_comment': ':warning:
alert'}, headers: {'Content-Type': 'application/json;charset=utf-8',
'Authorization': '(redacted)', 'User-Agent': 'Python/3.6.12 slackclient/3.3.2
Linux/5.11.5-arch1-1'}
DEBUG - Received the following response - status: 200, headers: {'date':
'Thu, 18 Mar 2021 13:28:44 GMT', 'server': 'Apache', 'x-xss-protection': '0',
'pragma': 'no-cache', 'cache-control': 'private, no-cache, no-store,
must-revalidate', 'access-control-allow-origin': '*',
'strict-transport-security': 'max-age=31536000; includeSubDomains; preload',
'x-slack-req-id': '0ff5fd17ca7e2e8397559b6347b34820', 'x-content-type-options':
'nosniff', 'referrer-policy': 'no-referrer', 'access-control-expose-headers':
'x-slack-req-id, retry-after', 'x-slack-backend': 'r', 'x-oauth-scopes':
'incoming-webhook,files:write,chat:write', 'x-accepted-oauth-scopes':
'files:write', 'expires': 'Mon, 26 Jul 1997 05:00:00 GMT', 'vary':
'Accept-Encoding', 'access-control-allow-headers': 'slack-route,
x-slack-version-ts, x-b3-traceid, x-b3-spanid, x-b3-parentspanid, x-b3-sampled,
x-b3-flags', 'content-type': 'application/json; charset=utf-8',
'x-envoy-upstream-service-time': '37', 'x-backend': 'files_normal f
iles_bedrock_normal_with_overflow files_canary_with_overflow
files_bedrock_canary_with_overflow files_control_with_overflow
files_bedrock_control_with_overflow', 'x-server':
'slack-www-hhvm-files-iad-xg4a', 'x-via': 'envoy-www-iad-xvw3,
haproxy-edge-lhr-u1ge', 'x-slack-shared-secret-outcome': 'shared-secret',
'via': 'envoy-www-iad-xvw3', 'connection': 'close', 'transfer-encoding':
'chunked'}, body: {'ok': False, 'error': 'no_file_data'}
[2021-03-18 13:28:43,601] {taskinstance.py:1455} ERROR - The request to the
Slack API failed.
The server responded with: {'ok': False, 'error': 'no_file_data'}
```
**What you expected to happen**:
I expect the operator to succeed and see a new message in Slack with a
snippet of a downloadable CSV file.
**How to reproduce it**:
Just declare a DAG this way:
```
from airflow import DAG
from airflow.providers.slack.operators.slack import SlackAPIFileOperator
from pendulum import datetime
with DAG(dag_id="SlackFile",
default_args=dict(start_date=datetime(2021, 1, 1), owner='airflow',
catchup=False)) as dag:
SlackAPIFileOperator(
task_id="Slack",
token=YOUR_TOKEN,
content="test-content"
)
```
And try to run it.
**Anything else we need to know**:
This seems to be a known issue:
https://apache-airflow.slack.com/archives/CCQ7EGB1P/p1616079965083200
I workaround it with this following re-implementation:
```
from typing import Optional, Any
from airflow import AirflowException
from airflow.providers.slack.hooks.slack import SlackHook
from airflow.providers.slack.operators.slack import SlackAPIOperator
from airflow.utils.decorators import apply_defaults
class SlackAPIFileOperator(SlackAPIOperator):
"""
Send a file to a slack channel
Examples:
.. code-block:: python
slack = SlackAPIFileOperator(
task_id="slack_file_upload",
dag=dag,
slack_conn_id="slack",
channel="#general",
initial_comment="Hello World!",
file="hello_world.csv",
filename="hello_world.csv",
filetype="csv",
content="hello,world,csv,file",
)
:param channel: channel in which to sent file on slack name (templated)
:type channel: str
:param initial_comment: message to send to slack. (templated)
:type initial_comment: str
:param file: the file (templated)
:type file: str
:param filename: name of the file (templated)
:type filename: str
:param filetype: slack filetype. (templated)
- see https://api.slack.com/types/file
:type filetype: str
:param content: file content. (templated)
:type content: str
"""
template_fields = ('channel', 'initial_comment', 'file', 'filename',
'filetype', 'content')
ui_color = '#44BEDF'
@apply_defaults
def __init__(
self,
channel: str = '#general',
initial_comment: str = 'No message has been set!',
file: Optional[str] = None,
filename: str = 'default_name.csv',
filetype: str = 'csv',
content: Optional[str] = None,
**kwargs,
) -> None:
if (content is None) and (file is None):
raise AirflowException('At least one of "content" or "file"
should be defined.')
self.method = 'files.upload'
self.channel = channel
self.initial_comment = initial_comment
self.file = file
self.filename = filename
self.filetype = filetype
self.content = content
super().__init__(method=self.method, **kwargs)
def execute(self, **kwargs):
slack = SlackHook(token=self.token, slack_conn_id=self.slack_conn_id)
args = dict(
channels=self.channel,
filename=self.filename,
filetype=self.filetype,
initial_comment=self.initial_comment
)
if self.content is not None:
args['content'] = self.content
elif self.file is not None:
args['file'] = self.content
slack.call(self.method, data=args)
def construct_api_call_params(self) -> Any:
pass
```
Maybe it is not the best solution as it does not leverage work from
`SlackAPIOperator`.
But at least, it fullfill my use case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]