seanghaeli opened a new pull request, #56160:
URL: https://github.com/apache/airflow/pull/56160

   Implementation and unit test for HTTP Notifier. Here's the Dag I used to 
test that it works end-to-end
   
   ```
   from datetime import datetime
   from airflow import DAG
   from airflow.operators.bash import BashOperator
   from airflow.providers.http.notifications import HttpNotifier
   
   http_notifier = HttpNotifier(
       http_conn_id="local_server_conn",
       endpoint="",
       method="POST",
       json={
           "dag_id": "{{ dag.dag_id }}",
           "task_id": "{{ task.task_id }}",
           "execution_date": "{{ ds }}",
           "status": "success"
       }
   )
   
   dag = DAG(
       "test_http_notifier",
       start_date=datetime(2024, 1, 1),
       schedule=None,
       catchup=False,
       on_success_callback=http_notifier,
   )
   
   test_task = BashOperator(
       task_id="test_task",
       bash_command="echo 'Testing HTTP notifier'",
       dag=dag,
   )
   ```
   
   I add a connection within airflow using: 
   
   ```
   airflow connections add local_server_conn \ 
   --conn-type http \ 
   --conn-host host.docker.internal \ 
   --conn-port 8081 \ 
   --conn-schema http
   ```
   
   And spin up a local API server via python:
   
   ```
   python3 -c "
   import http.server
   import socketserver
   import json
   from datetime import datetime
   
   class Handler(http.server.SimpleHTTPRequestHandler):
       def do_POST(self):
           print(f'[{datetime.now().strftime(\"%H:%M:%S\")}] 🚁 AIRFLOW 
NOTIFICATION!')
           
           length = int(self.headers.get('Content-Length', 0))
           data = self.rfile.read(length).decode('utf-8')
           
           try:
               json_data = json.loads(data)
               print(f'📋 Airflow sent:')
               print(json.dumps(json_data, indent=2))
           except:
               print(f'📝 Raw data: {data}')
           
           print('=' * 60)
           
           self.send_response(200)
           self.send_header('Content-Type', 'application/json')
           self.end_headers()
           self.wfile.write(b'{\"status\": \"notification received!\"}')
   
   print('AIRFLOW NOTIFICATION SERVER READY')
   print('Listening on http://localhost:8081')
   print('=' * 60)
   socketserver.TCPServer(('', 8081), Handler).serve_forever()
   "
   ```
   
   Then I run the above Dag, and on the CLI with the python API-server running 
I can see the contents of the POST request that is triggered when the Dag 
completes successfully.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to