jon-petty-imperfect opened a new issue, #59248:
URL: https://github.com/apache/airflow/issues/59248
### Apache Airflow version
3.1.3
### If "Other Airflow 2/3 version" selected, which one?
_No response_
### What happened?
**Summary**
I am trying to setup a DAG that triggers when messages are dropped into a
Valkey queue.
I expect the following to happen.
- I write a message to the Valkey Queue with some JSON in it.
- The Airflow MessageQueueTrigger sees the message in Valkey, and it
triggers the DAG.
The following appears to be happening.
- I write a message to the Valkey Queue with some JSON in it.
- Airflow reads the message as binary. It attempts to encode the binary to
JSON so that it can write an `asset_event` to the Airflow database.
- An exception is thrown because binary cannot be encoded to JSON.
The following are the versions.
- Airflow: 3.1.3
- Running as standalone in the official docker container.
- The following pips are installed.
- `apache-airflow-providers-redis`
- `apache-airflow-providers-common-messaging`
- `redis`
- Valkey: 9.0
- Running in a separate container.
**Extra Information**
The part of the Exception that makes me suspect it's an encoding thing is
shown below. It appears to be reading the `channel` and the `data` as binary,
and the exception is saying bytes aren't JSON serializable.
```
airflow-server | triggerer | sqlalchemy.exc.StatementError:
(builtins.TypeError) Object of type bytes is not JSON serializable
airflow-server | triggerer | [SQL: INSERT INTO asset_event (asset_id,
extra, source_task_id, source_dag_id, source_run_id, timestamp) VALUES (?, ?,
?, ?, ?, ?)]
airflow-server | triggerer | [parameters: [{'extra': {'from_trigger':
True, 'payload': {'type': 'message', 'pattern': None, 'channel':
b'test_airflow_queue', 'data': b'{"hello":"world"}'}}, 'asset_id': 1,
'source_run_id': None, 'source_task_id': None, 'source_dag_id': None}]]
```
The full exception block is the following.
```
airflow-server | triggerer | 2025-12-09T16:51:35.143456Z [error ]
Exception when executing TriggerRunnerSupervisor.run
[airflow.jobs.triggerer_job_runner.TriggererJobRunner]
loc=triggerer_job_runner.py:173
airflow-server | triggerer | Traceback (most recent call last):
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 1810, in _execute_context
airflow-server | triggerer | context = constructor(
airflow-server | triggerer | ^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py",
line 1078, in _init_compiled
airflow-server | triggerer | processors[key](compiled_params[key])
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py",
line 1668, in process
airflow-server | triggerer | return impl_processor(process_param(value,
dialect))
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py",
line 2669, in process
airflow-server | triggerer | serialized = json_serializer(value)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/usr/python/lib/python3.12/json/__init__.py", line 231, in dumps
airflow-server | triggerer | return _default_encoder.encode(obj)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/usr/python/lib/python3.12/json/encoder.py", line 200, in encode
airflow-server | triggerer | chunks = self.iterencode(o, _one_shot=True)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/usr/python/lib/python3.12/json/encoder.py", line 258, in iterencode
airflow-server | triggerer | return _iterencode(o, 0)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/usr/python/lib/python3.12/json/encoder.py", line 180, in default
airflow-server | triggerer | raise TypeError(f'Object of type
{o.__class__.__name__} '
airflow-server | triggerer | TypeError: Object of type bytes is not JSON
serializable
airflow-server | triggerer | The above exception was the direct cause of
the following exception:
airflow-server | triggerer | Traceback (most recent call last):
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
line 170, in _execute
airflow-server | triggerer | self.trigger_runner.run()
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
line 531, in run
airflow-server | triggerer | self.handle_events()
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/traces/tracer.py",
line 58, in wrapper
airflow-server | triggerer | return func(*args, **kwargs)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
line 558, in handle_events
airflow-server | triggerer | Trigger.submit_event(trigger_id=trigger_id,
event=event)
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py",
line 100, in wrapper
airflow-server | triggerer | return func(*args, session=session, **kwargs)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/trigger.py",
line 260, in submit_event
airflow-server | triggerer | AssetManager.register_asset_change(
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/assets/manager.py",
line 161, in register_asset_change
airflow-server | triggerer | session.flush() # Ensure the event is
written earlier than DDRQ entries below.
airflow-server | triggerer | ^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py",
line 3449, in flush
airflow-server | triggerer | self._flush(objects)
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py",
line 3588, in _flush
airflow-server | triggerer | with util.safe_reraise():
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py",
line 70, in __exit__
airflow-server | triggerer | compat.raise_(
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
airflow-server | triggerer | raise exception
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py",
line 3549, in _flush
airflow-server | triggerer | flush_context.execute()
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py",
line 456, in execute
airflow-server | triggerer | rec.execute(self)
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py",
line 630, in execute
airflow-server | triggerer | util.preloaded.orm_persistence.save_obj(
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py",
line 245, in save_obj
airflow-server | triggerer | _emit_insert_statements(
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py",
line 1238, in _emit_insert_statements
airflow-server | triggerer | result = connection._execute_20(
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 1710, in _execute_20
airflow-server | triggerer | return meth(self, args_10style,
kwargs_10style, execution_options)
airflow-server | triggerer |
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py",
line 334, in _execute_on_connection
airflow-server | triggerer | return connection._execute_clauseelement(
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 1577, in _execute_clauseelement
airflow-server | triggerer | ret = self._execute_context(
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 1816, in _execute_context
airflow-server | triggerer | self._handle_dbapi_exception(
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 2134, in _handle_dbapi_exception
airflow-server | triggerer | util.raise_(
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
airflow-server | triggerer | raise exception
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 1810, in _execute_context
airflow-server | triggerer | context = constructor(
airflow-server | triggerer | ^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py",
line 1078, in _init_compiled
airflow-server | triggerer | processors[key](compiled_params[key])
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py",
line 1668, in process
airflow-server | triggerer | return impl_processor(process_param(value,
dialect))
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py",
line 2669, in process
airflow-server | triggerer | serialized = json_serializer(value)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/usr/python/lib/python3.12/json/__init__.py", line 231, in dumps
airflow-server | triggerer | return _default_encoder.encode(obj)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/usr/python/lib/python3.12/json/encoder.py", line 200, in encode
airflow-server | triggerer | chunks = self.iterencode(o, _one_shot=True)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/usr/python/lib/python3.12/json/encoder.py", line 258, in iterencode
airflow-server | triggerer | return _iterencode(o, 0)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/usr/python/lib/python3.12/json/encoder.py", line 180, in default
airflow-server | triggerer | raise TypeError(f'Object of type
{o.__class__.__name__} '
airflow-server | triggerer | sqlalchemy.exc.StatementError:
(builtins.TypeError) Object of type bytes is not JSON serializable
airflow-server | triggerer | [SQL: INSERT INTO asset_event (asset_id,
extra, source_task_id, source_dag_id, source_run_id, timestamp) VALUES (?, ?,
?, ?, ?, ?)]
airflow-server | triggerer | [parameters: [{'extra': {'from_trigger':
True, 'payload': {'type': 'message', 'pattern': None, 'channel':
b'test_airflow_queue', 'data': b'{"hello":"world"}'}}, 'asset_id': 1,
'source_run_id': None, 'source_task_id': None, 'source_dag_id': None}]]
airflow-server | triggerer | 2025-12-09T16:51:35.148429Z [info ]
Waiting for triggers to clean up
[airflow.jobs.triggerer_job_runner.TriggererJobRunner]
loc=triggerer_job_runner.py:176
airflow-server | triggerer | 2025-12-09T16:51:35.151731Z [info ]
Process exited [supervisor] exit_code=-2 loc=supervisor.py:709
pid=37 signal_sent=SIGINT
airflow-server | triggerer | 2025-12-09T16:51:35.151835Z [info ]
Exited trigger loop
[airflow.jobs.triggerer_job_runner.TriggererJobRunner]
loc=triggerer_job_runner.py:181
airflow-server | triggerer | Traceback (most recent call last):
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 1810, in _execute_context
airflow-server | triggerer | context = constructor(
airflow-server | triggerer | ^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py",
line 1078, in _init_compiled
airflow-server | triggerer | processors[key](compiled_params[key])
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py",
line 1668, in process
airflow-server | triggerer | return impl_processor(process_param(value,
dialect))
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py",
line 2669, in process
airflow-server | triggerer | serialized = json_serializer(value)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/usr/python/lib/python3.12/json/__init__.py", line 231, in dumps
airflow-server | triggerer | return _default_encoder.encode(obj)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/usr/python/lib/python3.12/json/encoder.py", line 200, in encode
airflow-server | triggerer | chunks = self.iterencode(o, _one_shot=True)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/usr/python/lib/python3.12/json/encoder.py", line 258, in iterencode
airflow-server | triggerer | return _iterencode(o, 0)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/usr/python/lib/python3.12/json/encoder.py", line 180, in default
airflow-server | triggerer | raise TypeError(f'Object of type
{o.__class__.__name__} '
airflow-server | triggerer | TypeError: Object of type bytes is not JSON
serializable
airflow-server | triggerer | The above exception was the direct cause of
the following exception:
airflow-server | triggerer | Traceback (most recent call last):
airflow-server | triggerer | File "/home/airflow/.local/bin/airflow", line
7, in <module>
airflow-server | triggerer | sys.exit(main())
airflow-server | triggerer | ^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line
55, in main
airflow-server | triggerer | args.func(args)
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py",
line 49, in command
airflow-server | triggerer | return func(*args, **kwargs)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line
114, in wrapper
airflow-server | triggerer | return f(*args, **kwargs)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py",
line 54, in wrapped_function
airflow-server | triggerer | return func(*args, **kwargs)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py",
line 69, in triggerer
airflow-server | triggerer | run_command_with_daemon_option(
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py",
line 86, in run_command_with_daemon_option
airflow-server | triggerer | callback()
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py",
line 72, in <lambda>
airflow-server | triggerer | callback=lambda:
triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate),
airflow-server | triggerer |
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py",
line 55, in triggerer_run
airflow-server | triggerer | run_job(job=triggerer_job_runner.job,
execute_callable=triggerer_job_runner._execute)
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py",
line 100, in wrapper
airflow-server | triggerer | return func(*args, session=session, **kwargs)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line
368, in run_job
airflow-server | triggerer | return execute_job(job,
execute_callable=execute_callable)
airflow-server | triggerer |
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line
397, in execute_job
airflow-server | triggerer | ret = execute_callable()
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
line 170, in _execute
airflow-server | triggerer | self.trigger_runner.run()
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
line 531, in run
airflow-server | triggerer | self.handle_events()
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/traces/tracer.py",
line 58, in wrapper
airflow-server | triggerer | return func(*args, **kwargs)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
line 558, in handle_events
airflow-server | triggerer | Trigger.submit_event(trigger_id=trigger_id,
event=event)
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py",
line 100, in wrapper
airflow-server | triggerer | return func(*args, session=session, **kwargs)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/trigger.py",
line 260, in submit_event
airflow-server | triggerer | AssetManager.register_asset_change(
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/assets/manager.py",
line 161, in register_asset_change
airflow-server | triggerer | session.flush() # Ensure the event is
written earlier than DDRQ entries below.
airflow-server | triggerer | ^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py",
line 3449, in flush
airflow-server | triggerer | self._flush(objects)
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py",
line 3588, in _flush
airflow-server | triggerer | with util.safe_reraise():
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py",
line 70, in __exit__
airflow-server | triggerer | compat.raise_(
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
airflow-server | triggerer | raise exception
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py",
line 3549, in _flush
airflow-server | triggerer | flush_context.execute()
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py",
line 456, in execute
airflow-server | triggerer | rec.execute(self)
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py",
line 630, in execute
airflow-server | triggerer | util.preloaded.orm_persistence.save_obj(
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py",
line 245, in save_obj
airflow-server | triggerer | _emit_insert_statements(
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py",
line 1238, in _emit_insert_statements
airflow-server | triggerer | result = connection._execute_20(
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 1710, in _execute_20
airflow-server | triggerer | return meth(self, args_10style,
kwargs_10style, execution_options)
airflow-server | triggerer |
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py",
line 334, in _execute_on_connection
airflow-server | triggerer | return connection._execute_clauseelement(
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 1577, in _execute_clauseelement
airflow-server | triggerer | ret = self._execute_context(
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 1816, in _execute_context
airflow-server | triggerer | self._handle_dbapi_exception(
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py",
line 2134, in _handle_dbapi_exception
airflow-server | triggerer | util.raise_(
airflow-server | triggerer | File
"/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
airflow-server | triggerer | raise exception
```
The following is my DAG file. I expect it to trigger when Redis messages are
added and then do an `EmptyOperator`.
```
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.providers.common.messaging.triggers.msg_queue import
MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, Asset, AssetWatcher, chain
# 1. Define the Trigger
# Set the scheme to 'redis+pubsub' and specify the channel to listen on.
REDIS_CHANNEL = "test_airflow_queue"
redis_trigger = MessageQueueTrigger(
scheme="redis+pubsub",
channels=[REDIS_CHANNEL],
redis_conn_id="redis_shared", # Your Airflow Connection ID
)
# 2. Define the Asset
# The Asset represents the external event source (the Redis queue/channel).
redis_asset = Asset(
name="redis_queue_asset",
watchers=[
AssetWatcher(
name=f"redis_watcher_{REDIS_CHANNEL}",
trigger=redis_trigger,
)
],
)
with DAG(
dag_id="redis_event_driven_dag",
# Set schedule to the redis_asset
schedule=[redis_asset],
) as dag:
# Do nothing for now
EmptyOperator(task_id="dag_did_something")
```
I am writing the following JSON to the Redis Queue, and I'm encoding it as
UTF-8 before I write it.
```
{"hello":"world"}
```
I attempted to setup a Redis Connection in the Airflow UI that turns on
decoding by adding the following to the Extra Field JSON.
```
{
"decode_responses": true
}
```
### What you think should happen instead?
Sorry in advance if I'm missing something something that already exists.
I thought one or more of the following should have happened.
- The DAG is triggered and it gets the contents of the queue-message.
As for how it does that, I'm unsure. Some ideas came to mind. And sorry if
these already exist.
- Make an option to force Redis connections to always decode.
- Add a callback function called `decode_data_function` (or something) to
the MessageQueueTrigger (or some related class) that I can implement. I could
implement a function that decodes the binary data into something that can be
encoded to JSON.
### How to reproduce
Steps to reproduce
- Launch the Standalone Airflow in Docker, along with a Redis container.
- Setup a Redis Connection in the Airflow Website
- Set host and port to the host and port of Redis
- I set Extra Fields JSON to the following
- {"decode_responses": true}
- Note, I found decode_responses using AI but I suspect it's
hallucinating.
- Create the following DAG
```
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.providers.common.messaging.triggers.msg_queue import
MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, Asset, AssetWatcher, chain
# 1. Define the Trigger
# Set the scheme to 'redis+pubsub' and specify the channel to listen on.
REDIS_CHANNEL = "test_airflow_queue"
redis_trigger = MessageQueueTrigger(
scheme="redis+pubsub",
channels=[REDIS_CHANNEL],
redis_conn_id="redis_shared", # Your Airflow Connection ID
)
# 2. Define the Asset
# The Asset represents the external event source (the Redis queue/channel).
redis_asset = Asset(
name="redis_queue_asset",
watchers=[
AssetWatcher(
name=f"redis_watcher_{REDIS_CHANNEL}",
trigger=redis_trigger,
)
],
)
with DAG(
dag_id="redis_event_driven_dag",
# Set schedule to the redis_asset
schedule=[redis_asset],
) as dag:
# Do nothing for now
EmptyOperator(task_id="dag_did_something")
```
- Turn the DAG on in the Airflow Website
- Drop a message into the Redis Queue
### Operating System
Debian GNU/Linux 12 (bookworm)
### Versions of Apache Airflow Providers
```
airflow@airflow-server:/opt/airflow$ pip freeze | grep
apache-airflow-providers
apache-airflow-providers-amazon==9.16.0
apache-airflow-providers-celery==3.13.0
apache-airflow-providers-cncf-kubernetes==10.9.0
apache-airflow-providers-common-compat==1.8.0
apache-airflow-providers-common-io==1.6.4
apache-airflow-providers-common-messaging==2.0.0
apache-airflow-providers-common-sql==1.28.2
apache-airflow-providers-docker==4.4.4
apache-airflow-providers-elasticsearch==6.3.4
apache-airflow-providers-fab==3.0.1
apache-airflow-providers-ftp==3.13.2
apache-airflow-providers-git==0.0.9
apache-airflow-providers-google==18.1.0
apache-airflow-providers-grpc==3.8.2
apache-airflow-providers-hashicorp==4.3.3
apache-airflow-providers-http==5.4.0
apache-airflow-providers-microsoft-azure==12.8.0
apache-airflow-providers-mysql==6.3.4
apache-airflow-providers-odbc==4.10.2
apache-airflow-providers-openlineage==2.7.3
apache-airflow-providers-postgres==6.4.0
apache-airflow-providers-redis==4.3.2
apache-airflow-providers-sendgrid==4.1.4
apache-airflow-providers-sftp==5.4.1
apache-airflow-providers-slack==9.4.0
apache-airflow-providers-smtp==2.3.1
apache-airflow-providers-snowflake==6.6.0
apache-airflow-providers-ssh==4.1.5
apache-airflow-providers-standard==1.9.1
```
### Deployment
Docker-Compose
### Deployment details
The following are the versions.
- Airflow: 3.1.3
- Running as standalone in the official docker container.
- The following pips are installed.
- `apache-airflow-providers-redis`
- `apache-airflow-providers-common-messaging`
- `redis`
- Valkey: 9.0
- Running in a separate container.
### Anything else?
The problem always occurs, even when I drop an empty string into the Redis
queue.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]