ipaddicting opened a new pull request, #58746:
URL: https://github.com/apache/airflow/pull/58746

   This PR is trying to fix the issue of serialization on redis message for 
AwaitMessageTrigger: [Object of type 'bytes' is not JSON 
serializable](https://stackoverflow.com/questions/44682018/typeerror-object-of-type-bytes-is-not-json-serializable).
   
   FYI, error logs without this fix as following:
   ```
   triggerer  | 2025-11-27T02:21:31.725627Z [info     ] 1 triggers currently 
running   [airflow.jobs.triggerer_job_runner] loc=triggerer_job_runner.py:735
   triggerer  | 2025-11-27T02:21:31.725707Z [info     ] 0 watchers currently 
running   [airflow.jobs.triggerer_job_runner] loc=triggerer_job_runner.py:735
   triggerer  | 2025-11-27T02:21:32.468039Z [info     ] Trigger fired event     
       [airflow.jobs.triggerer_job_runner] loc=triggerer_job_runner.py:735 
name='ID 3' result='TriggerEvent<{\'type\': \'message\', \'pattern\': None, 
\'channel\': b\'hug_alarm_events\', \'data\': b\'{"alarm_id":1234, 
"status":"Open", "timestamp":"2025-11-25 16:01:57"}\'}>'
   triggerer  | 2025-11-27T02:21:32.468319Z [info     ] trigger completed       
       [airflow.jobs.triggerer_job_runner] loc=triggerer_job_runner.py:735 
name='ID 3'
   triggerer  | 2025-11-27T02:21:32.746994Z [error    ] Exception when 
executing TriggerRunnerSupervisor.run 
[airflow.jobs.triggerer_job_runner.TriggererJobRunner] 
loc=triggerer_job_runner.py:173
   triggerer  | Traceback (most recent call last):
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1810, in _execute_context
   triggerer  | context = constructor(
   triggerer  | ^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py",
 line 1078, in _init_compiled
   triggerer  | processors[key](compiled_params[key])
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py", 
line 1668, in process
   triggerer  | return impl_processor(process_param(value, dialect))
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py", 
line 2669, in process
   triggerer  | serialized = json_serializer(value)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File "/usr/python/lib/python3.12/json/__init__.py", line 231, 
in dumps
   triggerer  | return _default_encoder.encode(obj)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 200, in 
encode
   triggerer  | chunks = self.iterencode(o, _one_shot=True)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 258, in 
iterencode
   triggerer  | return _iterencode(o, 0)
   triggerer  | ^^^^^^^^^^^^^^^^^
   triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 180, in 
default
   triggerer  | raise TypeError(f'Object of type {o.__class__.__name__} '
   triggerer  | TypeError: Object of type bytes is not JSON serializable
   triggerer  | The above exception was the direct cause of the following 
exception:
   triggerer  | Traceback (most recent call last):
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 170, in _execute
   triggerer  | self.trigger_runner.run()
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 531, in run
   triggerer  | self.handle_events()
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/traces/tracer.py", 
line 58, in wrapper
   triggerer  | return func(*args, **kwargs)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 558, in handle_events
   triggerer  | Trigger.submit_event(trigger_id=trigger_id, event=event)
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 100, in wrapper
   triggerer  | return func(*args, session=session, **kwargs)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/trigger.py", 
line 260, in submit_event
   triggerer  | AssetManager.register_asset_change(
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/assets/manager.py", 
line 161, in register_asset_change
   triggerer  | session.flush()  # Ensure the event is written earlier than 
DDRQ entries below.
   triggerer  | ^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3449, in flush
   triggerer  | self._flush(objects)
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3588, in _flush
   triggerer  | with util.safe_reraise():
   triggerer  | ^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py",
 line 70, in __exit__
   triggerer  | compat.raise_(
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
   triggerer  | raise exception
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3549, in _flush
   triggerer  | flush_context.execute()
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py",
 line 456, in execute
   triggerer  | rec.execute(self)
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py",
 line 630, in execute
   triggerer  | util.preloaded.orm_persistence.save_obj(
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py",
 line 245, in save_obj
   triggerer  | _emit_insert_statements(
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py",
 line 1238, in _emit_insert_statements
   triggerer  | result = connection._execute_20(
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1710, in _execute_20
   triggerer  | return meth(self, args_10style, kwargs_10style, 
execution_options)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", 
line 334, in _execute_on_connection
   triggerer  | return connection._execute_clauseelement(
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1577, in _execute_clauseelement
   triggerer  | ret = self._execute_context(
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1816, in _execute_context
   triggerer  | self._handle_dbapi_exception(
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 2134, in _handle_dbapi_exception
   triggerer  | util.raise_(
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
   triggerer  | raise exception
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1810, in _execute_context
   triggerer  | context = constructor(
   triggerer  | ^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py",
 line 1078, in _init_compiled
   triggerer  | processors[key](compiled_params[key])
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py", 
line 1668, in process
   triggerer  | return impl_processor(process_param(value, dialect))
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py", 
line 2669, in process
   triggerer  | serialized = json_serializer(value)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File "/usr/python/lib/python3.12/json/__init__.py", line 231, 
in dumps
   triggerer  | return _default_encoder.encode(obj)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 200, in 
encode
   triggerer  | chunks = self.iterencode(o, _one_shot=True)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 258, in 
iterencode
   triggerer  | return _iterencode(o, 0)
   triggerer  | ^^^^^^^^^^^^^^^^^
   triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 180, in 
default
   triggerer  | raise TypeError(f'Object of type {o.__class__.__name__} '
   triggerer  | sqlalchemy.exc.StatementError: (builtins.TypeError) Object of 
type bytes is not JSON serializable
   triggerer  | [SQL: INSERT INTO asset_event (asset_id, extra, source_task_id, 
source_dag_id, source_run_id, timestamp) VALUES (%s, %s, %s, %s, %s, %s)]
   triggerer  | [parameters: [{'extra': {'from_trigger': True, 'payload': 
{'type': 'message', 'pattern': None, 'channel': b'hug_alarm_events', 'data': 
b'{"alarm_id":1234, "status":"Open", "timestamp":"2025-11-25 16:01:57"}'}}, 
'asset_id': 3, 'source_task_id': None, 'source_run_id': None, 'source_dag_id': 
None}]]
   triggerer  | 2025-11-27T02:21:32.757560Z [info     ] Waiting for triggers to 
clean up [airflow.jobs.triggerer_job_runner.TriggererJobRunner] 
loc=triggerer_job_runner.py:176
   triggerer  | 2025-11-27T02:21:32.761020Z [info     ] Process exited          
       [supervisor] exit_code=-2 loc=supervisor.py:709 pid=8484 
signal_sent=SIGINT
   triggerer  | 2025-11-27T02:21:32.761116Z [info     ] Exited trigger loop     
       [airflow.jobs.triggerer_job_runner.TriggererJobRunner] 
loc=triggerer_job_runner.py:181
   triggerer  | Traceback (most recent call last):
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1810, in _execute_context
   triggerer  | context = constructor(
   triggerer  | ^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py",
 line 1078, in _init_compiled
   triggerer  | processors[key](compiled_params[key])
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py", 
line 1668, in process
   triggerer  | return impl_processor(process_param(value, dialect))
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py", 
line 2669, in process
   triggerer  | serialized = json_serializer(value)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File "/usr/python/lib/python3.12/json/__init__.py", line 231, 
in dumps
   triggerer  | return _default_encoder.encode(obj)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 200, in 
encode
   triggerer  | chunks = self.iterencode(o, _one_shot=True)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 258, in 
iterencode
   triggerer  | return _iterencode(o, 0)
   triggerer  | ^^^^^^^^^^^^^^^^^
   triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 180, in 
default
   triggerer  | raise TypeError(f'Object of type {o.__class__.__name__} '
   triggerer  | TypeError: Object of type bytes is not JSON serializable
   triggerer  | The above exception was the direct cause of the following 
exception:
   triggerer  | Traceback (most recent call last):
   triggerer  | File "/home/airflow/.local/bin/airflow", line 7, in <module>
   triggerer  | sys.exit(main())
   triggerer  | ^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line 
55, in main
   triggerer  | args.func(args)
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", 
line 49, in command
   triggerer  | return func(*args, **kwargs)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 
114, in wrapper
   triggerer  | return f(*args, **kwargs)
   triggerer  | ^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
 line 54, in wrapped_function
   triggerer  | return func(*args, **kwargs)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py",
 line 69, in triggerer
   triggerer  | run_command_with_daemon_option(
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
   triggerer  | callback()
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py",
 line 72, in <lambda>
   triggerer  | callback=lambda: triggerer_run(args.skip_serve_logs, 
args.capacity, triggerer_heartrate),
   triggerer  | 
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py",
 line 55, in triggerer_run
   triggerer  | run_job(job=triggerer_job_runner.job, 
execute_callable=triggerer_job_runner._execute)
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 100, in wrapper
   triggerer  | return func(*args, session=session, **kwargs)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
368, in run_job
   triggerer  | return execute_job(job, execute_callable=execute_callable)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
397, in execute_job
   triggerer  | ret = execute_callable()
   triggerer  | ^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 170, in _execute
   triggerer  | self.trigger_runner.run()
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 531, in run
   triggerer  | self.handle_events()
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/traces/tracer.py", 
line 58, in wrapper
   triggerer  | return func(*args, **kwargs)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 558, in handle_events
   triggerer  | Trigger.submit_event(trigger_id=trigger_id, event=event)
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 100, in wrapper
   triggerer  | return func(*args, session=session, **kwargs)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/trigger.py", 
line 260, in submit_event
   triggerer  | AssetManager.register_asset_change(
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/assets/manager.py", 
line 161, in register_asset_change
   triggerer  | session.flush()  # Ensure the event is written earlier than 
DDRQ entries below.
   triggerer  | ^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3449, in flush
   triggerer  | self._flush(objects)
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3588, in _flush
   triggerer  | with util.safe_reraise():
   triggerer  | ^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py",
 line 70, in __exit__
   triggerer  | compat.raise_(
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
   triggerer  | raise exception
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3549, in _flush
   triggerer  | flush_context.execute()
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py",
 line 456, in execute
   triggerer  | rec.execute(self)
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py",
 line 630, in execute
   triggerer  | util.preloaded.orm_persistence.save_obj(
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py",
 line 245, in save_obj
   triggerer  | _emit_insert_statements(
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py",
 line 1238, in _emit_insert_statements
   triggerer  | result = connection._execute_20(
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1710, in _execute_20
   triggerer  | return meth(self, args_10style, kwargs_10style, 
execution_options)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", 
line 334, in _execute_on_connection
   triggerer  | return connection._execute_clauseelement(
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1577, in _execute_clauseelement
   triggerer  | ret = self._execute_context(
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1816, in _execute_context
   triggerer  | self._handle_dbapi_exception(
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 2134, in _handle_dbapi_exception
   triggerer  | util.raise_(
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
   triggerer  | raise exception
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1810, in _execute_context
   triggerer  | context = constructor(
   triggerer  | ^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py",
 line 1078, in _init_compiled
   triggerer  | processors[key](compiled_params[key])
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py", 
line 1668, in process
   triggerer  | return impl_processor(process_param(value, dialect))
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py", 
line 2669, in process
   triggerer  | serialized = json_serializer(value)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File "/usr/python/lib/python3.12/json/__init__.py", line 231, 
in dumps
   triggerer  | return _default_encoder.encode(obj)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 200, in 
encode
   triggerer  | chunks = self.iterencode(o, _one_shot=True)
   triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 258, in 
iterencode
   triggerer  | return _iterencode(o, 0)
   triggerer  | ^^^^^^^^^^^^^^^^^
   triggerer  | File "/usr/python/lib/python3.12/json/encoder.py", line 180, in 
default
   triggerer  | raise TypeError(f'Object of type {o.__class__.__name__} '
   triggerer  | sqlalchemy.exc.StatementError: (builtins.TypeError) Object of 
type bytes is not JSON serializable
   triggerer  | [SQL: INSERT INTO asset_event (asset_id, extra, source_task_id, 
source_dag_id, source_run_id, timestamp) VALUES (%s, %s, %s, %s, %s, %s)]
   triggerer  | [parameters: [{'extra': {'from_trigger': True, 'payload': 
{'type': 'message', 'pattern': None, 'channel': b'hug_alarm_events', 'data': 
b'{"alarm_id":1234, "status":"Open", "timestamp":"2025-11-25 16:01:57"}'}}, 
'asset_id': 3, 'source_task_id': None, 'source_run_id': None, 'source_dag_id': 
None}]]
   ``` 
   
   Basically, the TriggerEvent couldn't be persisted due to the JSON 
serialiaztion error, and the triggerer instance would crash after that.
   
   And all tests for AwaitMessageTrigger were mocked...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to