ipaddicting opened a new pull request, #58746: URL: https://github.com/apache/airflow/pull/58746
This PR is trying to fix the issue of serialization on redis message for AwaitMessageTrigger: [Object of type 'bytes' is not JSON serializable](https://stackoverflow.com/questions/44682018/typeerror-object-of-type-bytes-is-not-json-serializable). FYI, error logs without this fix as following: ``` triggerer | 2025-11-27T02:21:31.725627Z [info ] 1 triggers currently running [airflow.jobs.triggerer_job_runner] loc=triggerer_job_runner.py:735 triggerer | 2025-11-27T02:21:31.725707Z [info ] 0 watchers currently running [airflow.jobs.triggerer_job_runner] loc=triggerer_job_runner.py:735 triggerer | 2025-11-27T02:21:32.468039Z [info ] Trigger fired event [airflow.jobs.triggerer_job_runner] loc=triggerer_job_runner.py:735 name='ID 3' result='TriggerEvent<{\'type\': \'message\', \'pattern\': None, \'channel\': b\'hug_alarm_events\', \'data\': b\'{"alarm_id":1234, "status":"Open", "timestamp":"2025-11-25 16:01:57"}\'}>' triggerer | 2025-11-27T02:21:32.468319Z [info ] trigger completed [airflow.jobs.triggerer_job_runner] loc=triggerer_job_runner.py:735 name='ID 3' triggerer | 2025-11-27T02:21:32.746994Z [error ] Exception when executing TriggerRunnerSupervisor.run [airflow.jobs.triggerer_job_runner.TriggererJobRunner] loc=triggerer_job_runner.py:173 triggerer | Traceback (most recent call last): triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1810, in _execute_context triggerer | context = constructor( triggerer | ^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 1078, in _init_compiled triggerer | processors[key](compiled_params[key]) triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py", line 1668, in process triggerer | return impl_processor(process_param(value, dialect)) triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py", line 2669, in process triggerer | serialized = json_serializer(value) triggerer | ^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/usr/python/lib/python3.12/json/__init__.py", line 231, in dumps triggerer | return _default_encoder.encode(obj) triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 200, in encode triggerer | chunks = self.iterencode(o, _one_shot=True) triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 258, in iterencode triggerer | return _iterencode(o, 0) triggerer | ^^^^^^^^^^^^^^^^^ triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 180, in default triggerer | raise TypeError(f'Object of type {o.__class__.__name__} ' triggerer | TypeError: Object of type bytes is not JSON serializable triggerer | The above exception was the direct cause of the following exception: triggerer | Traceback (most recent call last): triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 170, in _execute triggerer | self.trigger_runner.run() triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 531, in run triggerer | self.handle_events() triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/traces/tracer.py", line 58, in wrapper triggerer | return func(*args, **kwargs) triggerer | ^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 558, in handle_events triggerer | Trigger.submit_event(trigger_id=trigger_id, event=event) triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 100, in wrapper triggerer | return func(*args, session=session, **kwargs) triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/trigger.py", line 260, in submit_event triggerer | AssetManager.register_asset_change( triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/assets/manager.py", line 161, in register_asset_change triggerer | session.flush() # Ensure the event is written earlier than DDRQ entries below. triggerer | ^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3449, in flush triggerer | self._flush(objects) triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush triggerer | with util.safe_reraise(): triggerer | ^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ triggerer | compat.raise_( triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ triggerer | raise exception triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush triggerer | flush_context.execute() triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute triggerer | rec.execute(self) triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute triggerer | util.preloaded.orm_persistence.save_obj( triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj triggerer | _emit_insert_statements( triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 1238, in _emit_insert_statements triggerer | result = connection._execute_20( triggerer | ^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20 triggerer | return meth(self, args_10style, kwargs_10style, execution_options) triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection triggerer | return connection._execute_clauseelement( triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement triggerer | ret = self._execute_context( triggerer | ^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1816, in _execute_context triggerer | self._handle_dbapi_exception( triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception triggerer | util.raise_( triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ triggerer | raise exception triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1810, in _execute_context triggerer | context = constructor( triggerer | ^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 1078, in _init_compiled triggerer | processors[key](compiled_params[key]) triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py", line 1668, in process triggerer | return impl_processor(process_param(value, dialect)) triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py", line 2669, in process triggerer | serialized = json_serializer(value) triggerer | ^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/usr/python/lib/python3.12/json/__init__.py", line 231, in dumps triggerer | return _default_encoder.encode(obj) triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 200, in encode triggerer | chunks = self.iterencode(o, _one_shot=True) triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 258, in iterencode triggerer | return _iterencode(o, 0) triggerer | ^^^^^^^^^^^^^^^^^ triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 180, in default triggerer | raise TypeError(f'Object of type {o.__class__.__name__} ' triggerer | sqlalchemy.exc.StatementError: (builtins.TypeError) Object of type bytes is not JSON serializable triggerer | [SQL: INSERT INTO asset_event (asset_id, extra, source_task_id, source_dag_id, source_run_id, timestamp) VALUES (%s, %s, %s, %s, %s, %s)] triggerer | [parameters: [{'extra': {'from_trigger': True, 'payload': {'type': 'message', 'pattern': None, 'channel': b'hug_alarm_events', 'data': b'{"alarm_id":1234, "status":"Open", "timestamp":"2025-11-25 16:01:57"}'}}, 'asset_id': 3, 'source_task_id': None, 'source_run_id': None, 'source_dag_id': None}]] triggerer | 2025-11-27T02:21:32.757560Z [info ] Waiting for triggers to clean up [airflow.jobs.triggerer_job_runner.TriggererJobRunner] loc=triggerer_job_runner.py:176 triggerer | 2025-11-27T02:21:32.761020Z [info ] Process exited [supervisor] exit_code=-2 loc=supervisor.py:709 pid=8484 signal_sent=SIGINT triggerer | 2025-11-27T02:21:32.761116Z [info ] Exited trigger loop [airflow.jobs.triggerer_job_runner.TriggererJobRunner] loc=triggerer_job_runner.py:181 triggerer | Traceback (most recent call last): triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1810, in _execute_context triggerer | context = constructor( triggerer | ^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 1078, in _init_compiled triggerer | processors[key](compiled_params[key]) triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py", line 1668, in process triggerer | return impl_processor(process_param(value, dialect)) triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py", line 2669, in process triggerer | serialized = json_serializer(value) triggerer | ^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/usr/python/lib/python3.12/json/__init__.py", line 231, in dumps triggerer | return _default_encoder.encode(obj) triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 200, in encode triggerer | chunks = self.iterencode(o, _one_shot=True) triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 258, in iterencode triggerer | return _iterencode(o, 0) triggerer | ^^^^^^^^^^^^^^^^^ triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 180, in default triggerer | raise TypeError(f'Object of type {o.__class__.__name__} ' triggerer | TypeError: Object of type bytes is not JSON serializable triggerer | The above exception was the direct cause of the following exception: triggerer | Traceback (most recent call last): triggerer | File "/home/airflow/.local/bin/airflow", line 7, in <module> triggerer | sys.exit(main()) triggerer | ^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line 55, in main triggerer | args.func(args) triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command triggerer | return func(*args, **kwargs) triggerer | ^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 114, in wrapper triggerer | return f(*args, **kwargs) triggerer | ^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function triggerer | return func(*args, **kwargs) triggerer | ^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py", line 69, in triggerer triggerer | run_command_with_daemon_option( triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option triggerer | callback() triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py", line 72, in <lambda> triggerer | callback=lambda: triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate), triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py", line 55, in triggerer_run triggerer | run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute) triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 100, in wrapper triggerer | return func(*args, session=session, **kwargs) triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 368, in run_job triggerer | return execute_job(job, execute_callable=execute_callable) triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 397, in execute_job triggerer | ret = execute_callable() triggerer | ^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 170, in _execute triggerer | self.trigger_runner.run() triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 531, in run triggerer | self.handle_events() triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/traces/tracer.py", line 58, in wrapper triggerer | return func(*args, **kwargs) triggerer | ^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 558, in handle_events triggerer | Trigger.submit_event(trigger_id=trigger_id, event=event) triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 100, in wrapper triggerer | return func(*args, session=session, **kwargs) triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/trigger.py", line 260, in submit_event triggerer | AssetManager.register_asset_change( triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/assets/manager.py", line 161, in register_asset_change triggerer | session.flush() # Ensure the event is written earlier than DDRQ entries below. triggerer | ^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3449, in flush triggerer | self._flush(objects) triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush triggerer | with util.safe_reraise(): triggerer | ^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ triggerer | compat.raise_( triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ triggerer | raise exception triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush triggerer | flush_context.execute() triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute triggerer | rec.execute(self) triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute triggerer | util.preloaded.orm_persistence.save_obj( triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj triggerer | _emit_insert_statements( triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 1238, in _emit_insert_statements triggerer | result = connection._execute_20( triggerer | ^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20 triggerer | return meth(self, args_10style, kwargs_10style, execution_options) triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection triggerer | return connection._execute_clauseelement( triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement triggerer | ret = self._execute_context( triggerer | ^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1816, in _execute_context triggerer | self._handle_dbapi_exception( triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception triggerer | util.raise_( triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ triggerer | raise exception triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1810, in _execute_context triggerer | context = constructor( triggerer | ^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 1078, in _init_compiled triggerer | processors[key](compiled_params[key]) triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py", line 1668, in process triggerer | return impl_processor(process_param(value, dialect)) triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py", line 2669, in process triggerer | serialized = json_serializer(value) triggerer | ^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/usr/python/lib/python3.12/json/__init__.py", line 231, in dumps triggerer | return _default_encoder.encode(obj) triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 200, in encode triggerer | chunks = self.iterencode(o, _one_shot=True) triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 258, in iterencode triggerer | return _iterencode(o, 0) triggerer | ^^^^^^^^^^^^^^^^^ triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 180, in default triggerer | raise TypeError(f'Object of type {o.__class__.__name__} ' triggerer | sqlalchemy.exc.StatementError: (builtins.TypeError) Object of type bytes is not JSON serializable triggerer | [SQL: INSERT INTO asset_event (asset_id, extra, source_task_id, source_dag_id, source_run_id, timestamp) VALUES (%s, %s, %s, %s, %s, %s)] triggerer | [parameters: [{'extra': {'from_trigger': True, 'payload': {'type': 'message', 'pattern': None, 'channel': b'hug_alarm_events', 'data': b'{"alarm_id":1234, "status":"Open", "timestamp":"2025-11-25 16:01:57"}'}}, 'asset_id': 3, 'source_task_id': None, 'source_run_id': None, 'source_dag_id': None}]] ``` Basically, the TriggerEvent couldn't be persisted due to the JSON serialiaztion error, and the triggerer instance would crash after that. And all tests for AwaitMessageTrigger were mocked... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
