amoghrajesh commented on code in PR #59883:
URL: https://github.com/apache/airflow/pull/59883#discussion_r2667587980
##########
airflow-core/tests/unit/listeners/test_listeners.py:
##########
@@ -159,27 +151,25 @@ def
test_listener_captures_longrunning_taskinstances(create_task_instance_of_ope
@provide_session
-def test_class_based_listener(create_task_instance, session):
- lm = get_listener_manager()
+def test_class_based_listener(create_task_instance, session, listener_manager):
listener = class_listener.ClassBasedListener()
- lm.add_listener(listener)
+ listener_manager(listener)
ti = create_task_instance(session=session, state=TaskInstanceState.QUEUED)
ti.run()
assert listener.state == [TaskInstanceState.RUNNING,
TaskInstanceState.SUCCESS, DagRunState.SUCCESS]
-def test_listener_logs_call(caplog, create_task_instance, session):
- caplog.set_level(logging.DEBUG, logger="airflow.listeners.listener")
- lm = get_listener_manager()
- lm.add_listener(full_listener)
+def test_listener_logs_call(caplog, create_task_instance, session,
listener_manager):
+ caplog.set_level(logging.DEBUG,
logger="airflow.sdk._shared.listeners.listener")
Review Comment:
Unfortunately this is intentional because we use `DagMaker` in
`create_task_instance` fixture. That uses task sdk to make dags and hence the
sdk path is needed here.
##########
airflow-core/tests/unit/listeners/test_listeners.py:
##########
@@ -159,27 +151,25 @@ def
test_listener_captures_longrunning_taskinstances(create_task_instance_of_ope
@provide_session
-def test_class_based_listener(create_task_instance, session):
- lm = get_listener_manager()
+def test_class_based_listener(create_task_instance, session, listener_manager):
listener = class_listener.ClassBasedListener()
- lm.add_listener(listener)
+ listener_manager(listener)
ti = create_task_instance(session=session, state=TaskInstanceState.QUEUED)
ti.run()
assert listener.state == [TaskInstanceState.RUNNING,
TaskInstanceState.SUCCESS, DagRunState.SUCCESS]
-def test_listener_logs_call(caplog, create_task_instance, session):
- caplog.set_level(logging.DEBUG, logger="airflow.listeners.listener")
- lm = get_listener_manager()
- lm.add_listener(full_listener)
+def test_listener_logs_call(caplog, create_task_instance, session,
listener_manager):
+ caplog.set_level(logging.DEBUG,
logger="airflow.sdk._shared.listeners.listener")
+ listener_manager(full_listener)
ti = create_task_instance(session=session, state=TaskInstanceState.QUEUED)
ti.run()
- listener_logs = [r for r in caplog.record_tuples if r[0] ==
"airflow.listeners.listener"]
- assert all(r[:-1] == ("airflow.listeners.listener", logging.DEBUG) for r
in listener_logs)
+ listener_logs = [r for r in caplog.record_tuples if r[0] ==
"airflow.sdk._shared.listeners.listener"]
+ assert all(r[:-1] == ("airflow.sdk._shared.listeners.listener",
logging.DEBUG) for r in listener_logs)
Review Comment:
Same as above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]