jon-petty-imperfect opened a new issue, #59248:
URL: https://github.com/apache/airflow/issues/59248

   ### Apache Airflow version
   
   3.1.3
   
   ### If "Other Airflow 2/3 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   **Summary**
   
   I am trying to setup a DAG that triggers when messages are dropped into a 
Valkey queue.
   
   I expect the following to happen.
   - I write a message to the Valkey Queue with some JSON in it.
   - The Airflow MessageQueueTrigger sees the message in Valkey, and it 
triggers the DAG.
   
   The following appears to be happening.
   - I write a message to the Valkey Queue with some JSON in it.
   - Airflow reads the message as binary. It attempts to encode the binary to 
JSON so that it can write an `asset_event` to the Airflow database.
   - An exception is thrown because binary cannot be encoded to JSON.
   
   The following are the versions.
   - Airflow: 3.1.3
     - Running as standalone in the official docker container.
     - The following pips are installed.
       - `apache-airflow-providers-redis`
       - `apache-airflow-providers-common-messaging`
       - `redis`
   - Valkey: 9.0
     - Running in a separate container.
   
   **Extra Information**
   
   The part of the Exception that makes me suspect it's an encoding thing is 
shown below. It appears to be reading the `channel` and the `data` as binary, 
and the exception is saying bytes aren't JSON serializable.
   
   ```
   airflow-server  | triggerer  | sqlalchemy.exc.StatementError: 
(builtins.TypeError) Object of type bytes is not JSON serializable
   airflow-server  | triggerer  | [SQL: INSERT INTO asset_event (asset_id, 
extra, source_task_id, source_dag_id, source_run_id, timestamp) VALUES (?, ?, 
?, ?, ?, ?)]
   airflow-server  | triggerer  | [parameters: [{'extra': {'from_trigger': 
True, 'payload': {'type': 'message', 'pattern': None, 'channel': 
b'test_airflow_queue', 'data': b'{"hello":"world"}'}}, 'asset_id': 1, 
'source_run_id': None, 'source_task_id': None, 'source_dag_id': None}]]
   ```
   
   The full exception block is the following.
   
   ```
   airflow-server  | triggerer  | 2025-12-09T16:51:35.143456Z [error    ] 
Exception when executing TriggerRunnerSupervisor.run 
[airflow.jobs.triggerer_job_runner.TriggererJobRunner] 
loc=triggerer_job_runner.py:173
   airflow-server  | triggerer  | Traceback (most recent call last):
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1810, in _execute_context
   airflow-server  | triggerer  | context = constructor(
   airflow-server  | triggerer  | ^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py",
 line 1078, in _init_compiled
   airflow-server  | triggerer  | processors[key](compiled_params[key])
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py", 
line 1668, in process
   airflow-server  | triggerer  | return impl_processor(process_param(value, 
dialect))
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py", 
line 2669, in process
   airflow-server  | triggerer  | serialized = json_serializer(value)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/usr/python/lib/python3.12/json/__init__.py", line 231, in dumps
   airflow-server  | triggerer  | return _default_encoder.encode(obj)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/usr/python/lib/python3.12/json/encoder.py", line 200, in encode
   airflow-server  | triggerer  | chunks = self.iterencode(o, _one_shot=True)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/usr/python/lib/python3.12/json/encoder.py", line 258, in iterencode
   airflow-server  | triggerer  | return _iterencode(o, 0)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/usr/python/lib/python3.12/json/encoder.py", line 180, in default
   airflow-server  | triggerer  | raise TypeError(f'Object of type 
{o.__class__.__name__} '
   airflow-server  | triggerer  | TypeError: Object of type bytes is not JSON 
serializable
   airflow-server  | triggerer  | The above exception was the direct cause of 
the following exception:
   airflow-server  | triggerer  | Traceback (most recent call last):
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 170, in _execute
   airflow-server  | triggerer  | self.trigger_runner.run()
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 531, in run
   airflow-server  | triggerer  | self.handle_events()
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/traces/tracer.py", 
line 58, in wrapper
   airflow-server  | triggerer  | return func(*args, **kwargs)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 558, in handle_events
   airflow-server  | triggerer  | Trigger.submit_event(trigger_id=trigger_id, 
event=event)
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 100, in wrapper
   airflow-server  | triggerer  | return func(*args, session=session, **kwargs)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/trigger.py", 
line 260, in submit_event
   airflow-server  | triggerer  | AssetManager.register_asset_change(
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/assets/manager.py", 
line 161, in register_asset_change
   airflow-server  | triggerer  | session.flush()  # Ensure the event is 
written earlier than DDRQ entries below.
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3449, in flush
   airflow-server  | triggerer  | self._flush(objects)
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3588, in _flush
   airflow-server  | triggerer  | with util.safe_reraise():
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py",
 line 70, in __exit__
   airflow-server  | triggerer  | compat.raise_(
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
   airflow-server  | triggerer  | raise exception
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3549, in _flush
   airflow-server  | triggerer  | flush_context.execute()
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py",
 line 456, in execute
   airflow-server  | triggerer  | rec.execute(self)
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py",
 line 630, in execute
   airflow-server  | triggerer  | util.preloaded.orm_persistence.save_obj(
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py",
 line 245, in save_obj
   airflow-server  | triggerer  | _emit_insert_statements(
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py",
 line 1238, in _emit_insert_statements
   airflow-server  | triggerer  | result = connection._execute_20(
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1710, in _execute_20
   airflow-server  | triggerer  | return meth(self, args_10style, 
kwargs_10style, execution_options)
   airflow-server  | triggerer  | 
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", 
line 334, in _execute_on_connection
   airflow-server  | triggerer  | return connection._execute_clauseelement(
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1577, in _execute_clauseelement
   airflow-server  | triggerer  | ret = self._execute_context(
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1816, in _execute_context
   airflow-server  | triggerer  | self._handle_dbapi_exception(
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 2134, in _handle_dbapi_exception
   airflow-server  | triggerer  | util.raise_(
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
   airflow-server  | triggerer  | raise exception
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1810, in _execute_context
   airflow-server  | triggerer  | context = constructor(
   airflow-server  | triggerer  | ^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py",
 line 1078, in _init_compiled
   airflow-server  | triggerer  | processors[key](compiled_params[key])
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py", 
line 1668, in process
   airflow-server  | triggerer  | return impl_processor(process_param(value, 
dialect))
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py", 
line 2669, in process
   airflow-server  | triggerer  | serialized = json_serializer(value)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/usr/python/lib/python3.12/json/__init__.py", line 231, in dumps
   airflow-server  | triggerer  | return _default_encoder.encode(obj)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/usr/python/lib/python3.12/json/encoder.py", line 200, in encode
   airflow-server  | triggerer  | chunks = self.iterencode(o, _one_shot=True)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/usr/python/lib/python3.12/json/encoder.py", line 258, in iterencode
   airflow-server  | triggerer  | return _iterencode(o, 0)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/usr/python/lib/python3.12/json/encoder.py", line 180, in default
   airflow-server  | triggerer  | raise TypeError(f'Object of type 
{o.__class__.__name__} '
   airflow-server  | triggerer  | sqlalchemy.exc.StatementError: 
(builtins.TypeError) Object of type bytes is not JSON serializable
   airflow-server  | triggerer  | [SQL: INSERT INTO asset_event (asset_id, 
extra, source_task_id, source_dag_id, source_run_id, timestamp) VALUES (?, ?, 
?, ?, ?, ?)]
   airflow-server  | triggerer  | [parameters: [{'extra': {'from_trigger': 
True, 'payload': {'type': 'message', 'pattern': None, 'channel': 
b'test_airflow_queue', 'data': b'{"hello":"world"}'}}, 'asset_id': 1, 
'source_run_id': None, 'source_task_id': None, 'source_dag_id': None}]]
   airflow-server  | triggerer  | 2025-12-09T16:51:35.148429Z [info     ] 
Waiting for triggers to clean up 
[airflow.jobs.triggerer_job_runner.TriggererJobRunner] 
loc=triggerer_job_runner.py:176
   airflow-server  | triggerer  | 2025-12-09T16:51:35.151731Z [info     ] 
Process exited                 [supervisor] exit_code=-2 loc=supervisor.py:709 
pid=37 signal_sent=SIGINT
   airflow-server  | triggerer  | 2025-12-09T16:51:35.151835Z [info     ] 
Exited trigger loop            
[airflow.jobs.triggerer_job_runner.TriggererJobRunner] 
loc=triggerer_job_runner.py:181
   airflow-server  | triggerer  | Traceback (most recent call last):
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1810, in _execute_context
   airflow-server  | triggerer  | context = constructor(
   airflow-server  | triggerer  | ^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py",
 line 1078, in _init_compiled
   airflow-server  | triggerer  | processors[key](compiled_params[key])
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py", 
line 1668, in process
   airflow-server  | triggerer  | return impl_processor(process_param(value, 
dialect))
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py", 
line 2669, in process
   airflow-server  | triggerer  | serialized = json_serializer(value)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/usr/python/lib/python3.12/json/__init__.py", line 231, in dumps
   airflow-server  | triggerer  | return _default_encoder.encode(obj)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/usr/python/lib/python3.12/json/encoder.py", line 200, in encode
   airflow-server  | triggerer  | chunks = self.iterencode(o, _one_shot=True)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/usr/python/lib/python3.12/json/encoder.py", line 258, in iterencode
   airflow-server  | triggerer  | return _iterencode(o, 0)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/usr/python/lib/python3.12/json/encoder.py", line 180, in default
   airflow-server  | triggerer  | raise TypeError(f'Object of type 
{o.__class__.__name__} '
   airflow-server  | triggerer  | TypeError: Object of type bytes is not JSON 
serializable
   airflow-server  | triggerer  | The above exception was the direct cause of 
the following exception:
   airflow-server  | triggerer  | Traceback (most recent call last):
   airflow-server  | triggerer  | File "/home/airflow/.local/bin/airflow", line 
7, in <module>
   airflow-server  | triggerer  | sys.exit(main())
   airflow-server  | triggerer  | ^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line 
55, in main
   airflow-server  | triggerer  | args.func(args)
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", 
line 49, in command
   airflow-server  | triggerer  | return func(*args, **kwargs)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 
114, in wrapper
   airflow-server  | triggerer  | return f(*args, **kwargs)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
 line 54, in wrapped_function
   airflow-server  | triggerer  | return func(*args, **kwargs)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py",
 line 69, in triggerer
   airflow-server  | triggerer  | run_command_with_daemon_option(
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
   airflow-server  | triggerer  | callback()
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py",
 line 72, in <lambda>
   airflow-server  | triggerer  | callback=lambda: 
triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate),
   airflow-server  | triggerer  | 
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py",
 line 55, in triggerer_run
   airflow-server  | triggerer  | run_job(job=triggerer_job_runner.job, 
execute_callable=triggerer_job_runner._execute)
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 100, in wrapper
   airflow-server  | triggerer  | return func(*args, session=session, **kwargs)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
368, in run_job
   airflow-server  | triggerer  | return execute_job(job, 
execute_callable=execute_callable)
   airflow-server  | triggerer  | 
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 
397, in execute_job
   airflow-server  | triggerer  | ret = execute_callable()
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 170, in _execute
   airflow-server  | triggerer  | self.trigger_runner.run()
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 531, in run
   airflow-server  | triggerer  | self.handle_events()
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/traces/tracer.py", 
line 58, in wrapper
   airflow-server  | triggerer  | return func(*args, **kwargs)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 558, in handle_events
   airflow-server  | triggerer  | Trigger.submit_event(trigger_id=trigger_id, 
event=event)
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", 
line 100, in wrapper
   airflow-server  | triggerer  | return func(*args, session=session, **kwargs)
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/trigger.py", 
line 260, in submit_event
   airflow-server  | triggerer  | AssetManager.register_asset_change(
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/assets/manager.py", 
line 161, in register_asset_change
   airflow-server  | triggerer  | session.flush()  # Ensure the event is 
written earlier than DDRQ entries below.
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3449, in flush
   airflow-server  | triggerer  | self._flush(objects)
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3588, in _flush
   airflow-server  | triggerer  | with util.safe_reraise():
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py",
 line 70, in __exit__
   airflow-server  | triggerer  | compat.raise_(
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
   airflow-server  | triggerer  | raise exception
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", 
line 3549, in _flush
   airflow-server  | triggerer  | flush_context.execute()
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py",
 line 456, in execute
   airflow-server  | triggerer  | rec.execute(self)
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py",
 line 630, in execute
   airflow-server  | triggerer  | util.preloaded.orm_persistence.save_obj(
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py",
 line 245, in save_obj
   airflow-server  | triggerer  | _emit_insert_statements(
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py",
 line 1238, in _emit_insert_statements
   airflow-server  | triggerer  | result = connection._execute_20(
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1710, in _execute_20
   airflow-server  | triggerer  | return meth(self, args_10style, 
kwargs_10style, execution_options)
   airflow-server  | triggerer  | 
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", 
line 334, in _execute_on_connection
   airflow-server  | triggerer  | return connection._execute_clauseelement(
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1577, in _execute_clauseelement
   airflow-server  | triggerer  | ret = self._execute_context(
   airflow-server  | triggerer  | ^^^^^^^^^^^^^^^^^^^^^^
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 1816, in _execute_context
   airflow-server  | triggerer  | self._handle_dbapi_exception(
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", 
line 2134, in _handle_dbapi_exception
   airflow-server  | triggerer  | util.raise_(
   airflow-server  | triggerer  | File 
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
   airflow-server  | triggerer  | raise exception
   ```
   
   The following is my DAG file. I expect it to trigger when Redis messages are 
added and then do an `EmptyOperator`.
   
   ```
   from __future__ import annotations
   
   import pendulum
   
   from airflow.decorators import task
   from airflow.providers.common.messaging.triggers.msg_queue import 
MessageQueueTrigger
   from airflow.providers.standard.operators.empty import EmptyOperator
   from airflow.sdk import DAG, Asset, AssetWatcher, chain
   
   # 1. Define the Trigger
   # Set the scheme to 'redis+pubsub' and specify the channel to listen on.
   REDIS_CHANNEL = "test_airflow_queue"
   
   redis_trigger = MessageQueueTrigger(
       scheme="redis+pubsub",
       channels=[REDIS_CHANNEL],
       redis_conn_id="redis_shared", # Your Airflow Connection ID
   )
   
   # 2. Define the Asset
   # The Asset represents the external event source (the Redis queue/channel).
   redis_asset = Asset(
       name="redis_queue_asset",
       watchers=[
           AssetWatcher(
               name=f"redis_watcher_{REDIS_CHANNEL}",
               trigger=redis_trigger,
           )
       ],
   )
   
   
   with DAG(
       dag_id="redis_event_driven_dag",
       # Set schedule to the redis_asset
       schedule=[redis_asset],
   ) as dag:
       # Do nothing for now
       EmptyOperator(task_id="dag_did_something")
   ```
   
   I am writing the following JSON to the Redis Queue, and I'm encoding it as 
UTF-8 before I write it.
   
   ```
   {"hello":"world"}
   ```
   
   I attempted to setup a Redis Connection in the Airflow UI that turns on 
decoding by adding the following to the Extra Field JSON.
   
   ```
   {
     "decode_responses": true
   }
   ```
   
   ### What you think should happen instead?
   
   Sorry in advance if I'm missing something something that already exists.
   
   I thought one or more of the following should have happened.
   - The DAG is triggered and it gets the contents of the queue-message.
   
   As for how it does that, I'm unsure. Some ideas came to mind. And sorry if 
these already exist.
   - Make an option to force Redis connections to always decode.
   - Add a callback function called `decode_data_function` (or something) to 
the MessageQueueTrigger (or some related class) that I can implement. I could 
implement a function that decodes the binary data into something that can be 
encoded to JSON.
   
   ### How to reproduce
   
   Steps to reproduce
   
   - Launch the Standalone Airflow in Docker, along with a Redis container.
   - Setup a Redis Connection in the Airflow Website
     - Set host and port to the host and port of Redis
     - I set Extra Fields JSON to the following
       - {"decode_responses": true}
       - Note, I found decode_responses using AI but I suspect it's 
hallucinating.
   - Create the following DAG
   
   ```
   from __future__ import annotations
   
   import pendulum
   
   from airflow.decorators import task
   from airflow.providers.common.messaging.triggers.msg_queue import 
MessageQueueTrigger
   from airflow.providers.standard.operators.empty import EmptyOperator
   from airflow.sdk import DAG, Asset, AssetWatcher, chain
   
   # 1. Define the Trigger
   # Set the scheme to 'redis+pubsub' and specify the channel to listen on.
   REDIS_CHANNEL = "test_airflow_queue"
   
   redis_trigger = MessageQueueTrigger(
       scheme="redis+pubsub",
       channels=[REDIS_CHANNEL],
       redis_conn_id="redis_shared", # Your Airflow Connection ID
   )
   
   # 2. Define the Asset
   # The Asset represents the external event source (the Redis queue/channel).
   redis_asset = Asset(
       name="redis_queue_asset",
       watchers=[
           AssetWatcher(
               name=f"redis_watcher_{REDIS_CHANNEL}",
               trigger=redis_trigger,
           )
       ],
   )
   
   
   with DAG(
       dag_id="redis_event_driven_dag",
       # Set schedule to the redis_asset
       schedule=[redis_asset],
   ) as dag:
       # Do nothing for now
       EmptyOperator(task_id="dag_did_something")
   ```
   
   - Turn the DAG on in the Airflow Website
   - Drop a message into the Redis Queue
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Versions of Apache Airflow Providers
   
   ```
   airflow@airflow-server:/opt/airflow$ pip freeze | grep 
apache-airflow-providers
   apache-airflow-providers-amazon==9.16.0
   apache-airflow-providers-celery==3.13.0
   apache-airflow-providers-cncf-kubernetes==10.9.0
   apache-airflow-providers-common-compat==1.8.0
   apache-airflow-providers-common-io==1.6.4
   apache-airflow-providers-common-messaging==2.0.0
   apache-airflow-providers-common-sql==1.28.2
   apache-airflow-providers-docker==4.4.4
   apache-airflow-providers-elasticsearch==6.3.4
   apache-airflow-providers-fab==3.0.1
   apache-airflow-providers-ftp==3.13.2
   apache-airflow-providers-git==0.0.9
   apache-airflow-providers-google==18.1.0
   apache-airflow-providers-grpc==3.8.2
   apache-airflow-providers-hashicorp==4.3.3
   apache-airflow-providers-http==5.4.0
   apache-airflow-providers-microsoft-azure==12.8.0
   apache-airflow-providers-mysql==6.3.4
   apache-airflow-providers-odbc==4.10.2
   apache-airflow-providers-openlineage==2.7.3
   apache-airflow-providers-postgres==6.4.0
   apache-airflow-providers-redis==4.3.2
   apache-airflow-providers-sendgrid==4.1.4
   apache-airflow-providers-sftp==5.4.1
   apache-airflow-providers-slack==9.4.0
   apache-airflow-providers-smtp==2.3.1
   apache-airflow-providers-snowflake==6.6.0
   apache-airflow-providers-ssh==4.1.5
   apache-airflow-providers-standard==1.9.1
   ```
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   The following are the versions.
   - Airflow: 3.1.3
     - Running as standalone in the official docker container.
     - The following pips are installed.
       - `apache-airflow-providers-redis`
       - `apache-airflow-providers-common-messaging`
       - `redis`
   - Valkey: 9.0
     - Running in a separate container.
   
   ### Anything else?
   
   The problem always occurs, even when I drop an empty string into the Redis 
queue.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to