Haebuk opened a new issue, #27499: URL: https://github.com/apache/airflow/issues/27499
### Apache Airflow version Other Airflow 2 version (please specify below) ### What happened airflow version: 2.2.2 when I trigger dag to test on_failure_callback, a task failed but airflow didn't push the error message to my slack channel. the error message was `TypeError: __init__() got an unexpected keyword argument 'logger'` ### What you think should happen instead test code failed, send error message to my slack channel. ### How to reproduce 1. start with mwaa local runner container 2. trigger test_alert_dag() 3. see the logs. requirements.txt ``` --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.2/constraints-3.7.txt" apache-airflow[package-extra]==2.2.2 apache-airflow-providers-postgres[amazon] apache-airflow-providers-cncf-kubernetes psycopg2 fsspec s3fs pandas sagemaker==v1.72 dag-factory==0.7.2 ``` ### Operating System NAME="Amazon Linux" VERSION="2" ID="amzn" ID_LIKE="centos rhel fedora" VERSION_ID="2" PRETTY_NAME="Amazon Linux 2" ANSI_COLOR="0;33" CPE_NAME="cpe:2.3:o:amazon:amazon_linux:2" HOME_URL="https://amazonlinux.com/" ### Versions of Apache Airflow Providers apache-airflow-providers-amazon==2.4.0 apache-airflow-providers-celery==2.1.0 apache-airflow-providers-cncf-kubernetes==2.1.0 apache-airflow-providers-common-sql==1.2.0 apache-airflow-providers-ftp==2.0.1 apache-airflow-providers-http==2.0.1 apache-airflow-providers-imap==2.0.1 apache-airflow-providers-postgres==2.3.0 apache-airflow-providers-slack==6.0.0 apache-airflow-providers-sqlite==2.0.1 ### Deployment MWAA ### Deployment details run with mwaa local runner 2.2.2 ### Anything else 1. on_failure_callback error ``` *** Reading local file: /usr/local/airflow/logs/test_alert_dag/test_alert/2022-11-04T07:43:05.897406+00:00/1.log [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1035}} INFO - Dependencies all met for <TaskInstance: test_alert_dag.test_alert manual__2022-11-04T07:43:05.897406+00:00 [queued]> [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1035}} INFO - Dependencies all met for <TaskInstance: test_alert_dag.test_alert manual__2022-11-04T07:43:05.897406+00:00 [queued]> [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1241}} INFO - -------------------------------------------------------------------------------- [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1242}} INFO - Starting attempt 1 of 1 [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1243}} INFO - -------------------------------------------------------------------------------- [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1262}} INFO - Executing <Task(_PythonDecoratedOperator): test_alert> on 2022-11-04 07:43:05.897406+00:00 [2022-11-04, 07:43:07 UTC] {{standard_task_runner.py:52}} INFO - Started process 296 to run task [2022-11-04, 07:43:07 UTC] {{standard_task_runner.py:76}} INFO - Running: ['***', 'tasks', 'run', 'test_alert_dag', 'test_alert', 'manual__2022-11-04T07:43:05.897406+00:00', '--job-id', '82', '--raw', '--subdir', 'DAGS_FOLDER/test_alert/test_alert_dag.py', '--cfg-path', '/tmp/tmpxq9x7u68', '--error-file', '/tmp/tmpyxijbtye'] [2022-11-04, 07:43:07 UTC] {{standard_task_runner.py:77}} INFO - Job 82: Subtask test_alert [2022-11-04, 07:43:07 UTC] {{logging_mixin.py:109}} INFO - Running <TaskInstance: test_alert_dag.test_alert manual__2022-11-04T07:43:05.897406+00:00 [running]> on host 2a3d67b0b9bb [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1429}} INFO - Exporting the following env vars: AIRFLOW_CTX_DAG_OWNER=*** AIRFLOW_CTX_DAG_ID=test_alert_dag AIRFLOW_CTX_TASK_ID=test_alert AIRFLOW_CTX_EXECUTION_DATE=2022-11-04T07:43:05.897406+00:00 AIRFLOW_CTX_DAG_RUN_ID=manual__2022-11-04T07:43:05.897406+00:00 [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1703}} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task self._execute_task_with_callbacks(context) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks result = self._execute_task(context, self.task) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task result = execute_callable(context=context) File "/usr/local/lib/python3.7/site-packages/airflow/decorators/base.py", line 134, in execute return_value = super().execute(context) File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 151, in execute return_value = self.execute_callable() File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 162, in execute_callable return self.python_callable(*self.op_args, **self.op_kwargs) File "/usr/local/airflow/dags/test_alert/test_alert_dag.py", line 29, in test_alert raise Exception("test") Exception: test [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1280}} INFO - Marking task as FAILED. dag_id=test_alert_dag, task_id=test_alert, execution_date=20221104T074305, start_date=20221104T074307, end_date=20221104T074307 [2022-11-04, 07:43:07 UTC] {{standard_task_runner.py:91}} ERROR - Failed to execute job 82 for task test_alert Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork args.func(args, dag=self.dag) File "/usr/local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command return func(*args, **kwargs) File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper return f(*args, **kwargs) File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run _run_task_by_selected_method(args, dag, ti) File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method _run_raw_task(args, ti) File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 184, in _run_raw_task error_file=args.error_file, File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", line 70, in wrapper return func(*args, session=session, **kwargs) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task self._execute_task_with_callbacks(context) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks result = self._execute_task(context, self.task) File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task result = execute_callable(context=context) File "/usr/local/lib/python3.7/site-packages/airflow/decorators/base.py", line 134, in execute return_value = super().execute(context) File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 151, in execute return_value = self.execute_callable() File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 162, in execute_callable return self.python_callable(*self.op_args, **self.op_kwargs) File "/usr/local/airflow/dags/test_alert/test_alert_dag.py", line 29, in test_alert raise Exception("test") Exception: test [2022-11-04, 07:43:07 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 1 [2022-11-04, 07:43:07 UTC] {{base.py:79}} INFO - Using connection to: id: slack_webhook. Host: myhost, Port: None, Schema: , Login: #mychannel, Password: ***, extra: {} [2022-11-04, 07:43:07 UTC] {{base.py:79}} INFO - Using connection to: id: slack_webhook. Host: myhost, Port: None, Schema: , Login: #mychannel, Password: ***, extra: {} [2022-11-04, 07:43:07 UTC] {{logging_mixin.py:109}} WARNING - /usr/local/***/.local/lib/python3.7/site-packages/***/providers/slack/operators/slack_webhook.py:157 DeprecationWarning: Provide `webhook_token` as hook argument deprecated by security reason and will be removed in a future releases. Please specify it in `Slack Webhook` connection. [2022-11-04, 07:43:07 UTC] {{logging_mixin.py:109}} WARNING - /usr/local/***/.local/lib/python3.7/site-packages/***/providers/slack/operators/slack_webhook.py:173 UserWarning: Found unexpected keyword-argument(s) 'link_names' in `send` method. This argument(s) have no effect. [2022-11-04, 07:43:07 UTC] {{logging_mixin.py:109}} WARNING - /usr/local/***/.local/lib/python3.7/site-packages/***/providers/slack/hooks/slack_webhook.py:47 UserWarning: You cannot override the default channel (chosen by the user who installed your app), username, or icon when you're using Incoming Webhooks to post messages. Instead, these values will always inherit from the associated Slack app configuration. See: https://api.slack.com/messaging/webhooks#advanced_message_formatting. It is possible to change this values only in Legacy Slack Integration Incoming Webhook: https://api.slack.com/legacy/custom-integrations/messaging/webhooks#legacy-customizations [2022-11-04, 07:43:07 UTC] {{base.py:79}} INFO - Using connection to: id: slack_webhook. Host: myhost, Port: None, Schema: , Login: #mychannel, Password: ***, extra: {} [2022-11-04, 07:43:07 UTC] {{taskinstance.py:1580}} ERROR - Error when executing on_failure_callback Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1578, in _run_finished_callback task.on_failure_callback(context) File "/usr/local/airflow/plugins/awsairflowlib/hooks/alert.py", line 36, in slack_failure_alert return alert.execute(context=context) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/slack/operators/slack_webhook.py", line 173, in execute link_names=self.link_names, File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/slack/hooks/slack_webhook.py", line 397, in send return self.send_dict(body=body, headers=headers) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/slack/hooks/slack_webhook.py", line 47, in wrapper resp = func(*args, **kwargs) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/slack/hooks/slack_webhook.py", line 347, in send_dict return self.client.send_dict(body, headers=headers) File "/usr/local/lib/python3.7/site-packages/cached_property.py", line 36, in __get__ value = obj.__dict__[self.func.__name__] = self.func(obj) File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/providers/slack/hooks/slack_webhook.py", line 198, in client return WebhookClient(**self._get_conn_params()) TypeError: __init__() got an unexpected keyword argument 'logger' [2022-11-04, 07:43:07 UTC] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check ``` 2. alert.py ``` from airflow.hooks.base_hook import BaseHook from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator def slack_failure_alert(context): slack_conn_id = "slack_webhook" slack_webhook_token = BaseHook.get_connection(slack_conn_id).password channel = BaseHook.get_connection(slack_conn_id).login alert = SlackWebhookOperator( task_id="slack_failure_alert", http_conn_id=slack_conn_id, channel=channel, webhook_token=slack_webhook_token, icon_url="https://raw.githubusercontent.com/apache/airflow/main/airflow/www/static/pin_100.png", message=""" *Result* Failed :alert: *Task*: {task} *Dag*: {dag} *Execution Time*: {exec_date} *Log Url*: {log_url} """.format( task=context.get("task_instance").task_id, dag=context.get("task_instance").dag_id, exec_date=context.get("dag_run").logical_date, log_url=context.get("task_instance").log_url, ), ) return alert.execute(context=context) ``` 3. test code ``` ### # Copyright 2013-2022 AFI, Inc. All Rights Reserved. ### from datetime import timedelta from airflow.decorators import dag, task from airflow.utils.dates import days_ago from common_utils.alert import slack_failure_alert default_args = { "owner": "airflow", "retries": 0, "on_failure_callback": slack_failure_alert, "retry_delay": timedelta(seconds=5), } @dag( default_args=default_args, schedule_interval=None, start_date=days_ago(1), catchup=False, tags=["test", "alert", "slack"], ) def test_alert_dag(): @task() def test_alert(): raise Exception("test") test_alert() test_alert_dag = test_alert_dag() ``` ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
