Haebuk opened a new issue, #27499:
URL: https://github.com/apache/airflow/issues/27499

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   airflow version: 2.2.2
   
   when I trigger dag to test on_failure_callback, a task failed but airflow 
didn't push the error message to my slack channel.
   
   the error message was `TypeError: __init__() got an unexpected keyword 
argument 'logger'`
   
   ### What you think should happen instead
   
   test code failed, send error message to  my slack channel.
   
   ### How to reproduce
   
   1. start with mwaa local runner container
   2. trigger test_alert_dag()
   3. see the logs.
   
   requirements.txt
   ```
   --constraint 
"https://raw.githubusercontent.com/apache/airflow/constraints-2.2.2/constraints-3.7.txt";
   apache-airflow[package-extra]==2.2.2
   apache-airflow-providers-postgres[amazon]
   apache-airflow-providers-cncf-kubernetes
   psycopg2
   fsspec
   s3fs
   pandas
   sagemaker==v1.72
   dag-factory==0.7.2
   ```
   
   
   ### Operating System
   
   NAME="Amazon Linux" VERSION="2" ID="amzn" ID_LIKE="centos rhel fedora" 
VERSION_ID="2" PRETTY_NAME="Amazon Linux 2" ANSI_COLOR="0;33" 
CPE_NAME="cpe:2.3:o:amazon:amazon_linux:2" HOME_URL="https://amazonlinux.com/";
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==2.4.0
   apache-airflow-providers-celery==2.1.0
   apache-airflow-providers-cncf-kubernetes==2.1.0
   apache-airflow-providers-common-sql==1.2.0
   apache-airflow-providers-ftp==2.0.1
   apache-airflow-providers-http==2.0.1
   apache-airflow-providers-imap==2.0.1
   apache-airflow-providers-postgres==2.3.0
   apache-airflow-providers-slack==6.0.0
   apache-airflow-providers-sqlite==2.0.1
   
   ### Deployment
   
   MWAA
   
   ### Deployment details
   
   run with mwaa local runner 2.2.2
   
   ### Anything else
   
   1. on_failure_callback error
   ```
   *** Reading local file: 
/usr/local/airflow/logs/test_alert_dag/test_alert/2022-11-04T07:43:05.897406+00:00/1.log
   [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1035}} INFO - Dependencies all 
met for <TaskInstance: test_alert_dag.test_alert 
manual__2022-11-04T07:43:05.897406+00:00 [queued]>
   [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1035}} INFO - Dependencies all 
met for <TaskInstance: test_alert_dag.test_alert 
manual__2022-11-04T07:43:05.897406+00:00 [queued]>
   [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1241}} INFO - 
   
--------------------------------------------------------------------------------
   [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1242}} INFO - Starting attempt 
1 of 1
   [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1243}} INFO - 
   
--------------------------------------------------------------------------------
   [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1262}} INFO - Executing 
<Task(_PythonDecoratedOperator): test_alert> on 2022-11-04 07:43:05.897406+00:00
   [2022-11-04, 07:43:07 UTC] {{standard_task_runner.py:52}} INFO - Started 
process 296 to run task
   [2022-11-04, 07:43:07 UTC] {{standard_task_runner.py:76}} INFO - Running: 
['***', 'tasks', 'run', 'test_alert_dag', 'test_alert', 
'manual__2022-11-04T07:43:05.897406+00:00', '--job-id', '82', '--raw', 
'--subdir', 'DAGS_FOLDER/test_alert/test_alert_dag.py', '--cfg-path', 
'/tmp/tmpxq9x7u68', '--error-file', '/tmp/tmpyxijbtye']
   [2022-11-04, 07:43:07 UTC] {{standard_task_runner.py:77}} INFO - Job 82: 
Subtask test_alert
   [2022-11-04, 07:43:07 UTC] {{logging_mixin.py:109}} INFO - Running 
<TaskInstance: test_alert_dag.test_alert 
manual__2022-11-04T07:43:05.897406+00:00 [running]> on host 2a3d67b0b9bb
   [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1429}} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_OWNER=***
   AIRFLOW_CTX_DAG_ID=test_alert_dag
   AIRFLOW_CTX_TASK_ID=test_alert
   AIRFLOW_CTX_EXECUTION_DATE=2022-11-04T07:43:05.897406+00:00
   AIRFLOW_CTX_DAG_RUN_ID=manual__2022-11-04T07:43:05.897406+00:00
   [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1703}} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1332, in _run_raw_task
       self._execute_task_with_callbacks(context)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1458, in _execute_task_with_callbacks
       result = self._execute_task(context, self.task)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1514, in _execute_task
       result = execute_callable(context=context)
     File "/usr/local/lib/python3.7/site-packages/airflow/decorators/base.py", 
line 134, in execute
       return_value = super().execute(context)
     File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", 
line 151, in execute
       return_value = self.execute_callable()
     File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", 
line 162, in execute_callable
       return self.python_callable(*self.op_args, **self.op_kwargs)
     File "/usr/local/airflow/dags/test_alert/test_alert_dag.py", line 29, in 
test_alert
       raise Exception("test")
   Exception: test
   [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1280}} INFO - Marking task as 
FAILED. dag_id=test_alert_dag, task_id=test_alert, 
execution_date=20221104T074305, start_date=20221104T074307, 
end_date=20221104T074307
   [2022-11-04, 07:43:07 UTC] {{standard_task_runner.py:91}} ERROR - Failed to 
execute job 82 for task test_alert
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py",
 line 85, in _start_by_fork
       args.func(args, dag=self.dag)
     File "/usr/local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", 
line 48, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 
92, in wrapper
       return f(*args, **kwargs)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", 
line 292, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", 
line 107, in _run_task_by_selected_method
       _run_raw_task(args, ti)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", 
line 184, in _run_raw_task
       error_file=args.error_file,
     File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", 
line 70, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1332, in _run_raw_task
       self._execute_task_with_callbacks(context)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1458, in _execute_task_with_callbacks
       result = self._execute_task(context, self.task)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1514, in _execute_task
       result = execute_callable(context=context)
     File "/usr/local/lib/python3.7/site-packages/airflow/decorators/base.py", 
line 134, in execute
       return_value = super().execute(context)
     File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", 
line 151, in execute
       return_value = self.execute_callable()
     File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", 
line 162, in execute_callable
       return self.python_callable(*self.op_args, **self.op_kwargs)
     File "/usr/local/airflow/dags/test_alert/test_alert_dag.py", line 29, in 
test_alert
       raise Exception("test")
   Exception: test
   [2022-11-04, 07:43:07 UTC] {{local_task_job.py:154}} INFO - Task exited with 
return code 1
   [2022-11-04, 07:43:07 UTC] {{base.py:79}} INFO - Using connection to: id: 
slack_webhook. Host: myhost, Port: None, Schema: , Login: #mychannel, Password: 
***, extra: {}
   [2022-11-04, 07:43:07 UTC] {{base.py:79}} INFO - Using connection to: id: 
slack_webhook. Host: myhost, Port: None, Schema: , Login: #mychannel, Password: 
***, extra: {}
   [2022-11-04, 07:43:07 UTC] {{logging_mixin.py:109}} WARNING - 
/usr/local/***/.local/lib/python3.7/site-packages/***/providers/slack/operators/slack_webhook.py:157
 DeprecationWarning: Provide `webhook_token` as hook argument deprecated by 
security reason and will be removed in a future releases. Please specify it in 
`Slack Webhook` connection.
   [2022-11-04, 07:43:07 UTC] {{logging_mixin.py:109}} WARNING - 
/usr/local/***/.local/lib/python3.7/site-packages/***/providers/slack/operators/slack_webhook.py:173
 UserWarning: Found unexpected keyword-argument(s) 'link_names' in `send` 
method. This argument(s) have no effect.
   [2022-11-04, 07:43:07 UTC] {{logging_mixin.py:109}} WARNING - 
/usr/local/***/.local/lib/python3.7/site-packages/***/providers/slack/hooks/slack_webhook.py:47
 UserWarning: You cannot override the default channel (chosen by the user who 
installed your app), username, or icon when you're using Incoming Webhooks to 
post messages. Instead, these values will always inherit from the associated 
Slack app configuration. See: 
https://api.slack.com/messaging/webhooks#advanced_message_formatting. It is 
possible to change this values only in Legacy Slack Integration Incoming 
Webhook: 
https://api.slack.com/legacy/custom-integrations/messaging/webhooks#legacy-customizations
   [2022-11-04, 07:43:07 UTC] {{base.py:79}} INFO - Using connection to: id: 
slack_webhook. Host: myhost, Port: None, Schema: , Login: #mychannel, Password: 
***, extra: {}
   [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1580}} ERROR - Error when 
executing on_failure_callback
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1578, in _run_finished_callback
       task.on_failure_callback(context)
     File "/usr/local/airflow/plugins/awsairflowlib/hooks/alert.py", line 36, 
in slack_failure_alert
       return alert.execute(context=context)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/slack/operators/slack_webhook.py",
 line 173, in execute
       link_names=self.link_names,
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/slack/hooks/slack_webhook.py",
 line 397, in send
       return self.send_dict(body=body, headers=headers)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/slack/hooks/slack_webhook.py",
 line 47, in wrapper
       resp = func(*args, **kwargs)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/slack/hooks/slack_webhook.py",
 line 347, in send_dict
       return self.client.send_dict(body, headers=headers)
     File "/usr/local/lib/python3.7/site-packages/cached_property.py", line 36, 
in __get__
       value = obj.__dict__[self.func.__name__] = self.func(obj)
     File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/slack/hooks/slack_webhook.py",
 line 198, in client
       return WebhookClient(**self._get_conn_params())
   TypeError: __init__() got an unexpected keyword argument 'logger'
   [2022-11-04, 07:43:07 UTC] {{local_task_job.py:264}} INFO - 0 downstream 
tasks scheduled from follow-on schedule check
   ```
   
   2. alert.py
   ```
   
   from airflow.hooks.base_hook import BaseHook
   from airflow.providers.slack.operators.slack_webhook import 
SlackWebhookOperator
   
   
   def slack_failure_alert(context):
       slack_conn_id = "slack_webhook"
       slack_webhook_token = BaseHook.get_connection(slack_conn_id).password
       channel = BaseHook.get_connection(slack_conn_id).login
       alert = SlackWebhookOperator(
           task_id="slack_failure_alert",
           http_conn_id=slack_conn_id,
           channel=channel,
           webhook_token=slack_webhook_token,
           
icon_url="https://raw.githubusercontent.com/apache/airflow/main/airflow/www/static/pin_100.png";,
           message="""
               *Result* Failed :alert:
               *Task*: {task}
               *Dag*: {dag}
               *Execution Time*: {exec_date}
               *Log Url*: {log_url}
               """.format(
               task=context.get("task_instance").task_id,
               dag=context.get("task_instance").dag_id,
               exec_date=context.get("dag_run").logical_date,
               log_url=context.get("task_instance").log_url,
           ),
       )
   
       return alert.execute(context=context)
   ```
   
   3. test code
   ```
   ###
   # Copyright 2013-2022 AFI, Inc. All Rights Reserved.
   ###
   
   from datetime import timedelta
   
   from airflow.decorators import dag, task
   from airflow.utils.dates import days_ago
   
   from common_utils.alert import slack_failure_alert
   
   default_args = {
       "owner": "airflow",
       "retries": 0,
       "on_failure_callback": slack_failure_alert,
       "retry_delay": timedelta(seconds=5),
   }
   
   
   @dag(
       default_args=default_args,
       schedule_interval=None,
       start_date=days_ago(1),
       catchup=False,
       tags=["test", "alert", "slack"],
   )
   def test_alert_dag():
       @task()
       def test_alert():
           raise Exception("test")
   
       test_alert()
   
   
   test_alert_dag = test_alert_dag()
   
   ```
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to