jeffvswanson opened a new issue #17834:
URL: https://github.com/apache/airflow/issues/17834
<!--
Welcome to Apache Airflow!
Please complete the next sections or the issue will be closed.
-->
**Apache Airflow version**: 2.02
<!-- AIRFLOW VERSION IS MANDATORY -->
**OS**: Amazon Linux AMI
<!-- MANDATORY! You can get it via `cat /etc/oss-release` for example -->
**Apache Airflow Provider versions**: apache-airflow-providers-amazon, v2.1.0
<!-- You can use `pip freeze | grep apache-airflow-providers` (you can leave
only relevant ones)-->
**Deployment**: AWS MWAA
<!-- e.g. Virtualenv / VM / Docker-compose / K8S / Helm Chart / Managed
Airflow Service -->
<!-- Please include your deployment tools and versions: docker-compose, k8s,
helm, etc -->
**What happened**: Setting up a Smart Python Sensor in AWS MWAA always
reverts to a normal Python Sensor task. This is because the Sensor Instance
cannot JSON serialize the python callable function.
Error message
```
[2021-08-25 16:41:57,686] {{taskinstance.py:1301}} WARNING - Failed to
register in sensor service.Continue to run task in non smart sensor mode.
[2021-08-25 16:41:57,705] {{taskinstance.py:1303}} ERROR - Object of type
function is not JSON serializable
Traceback (most recent call last):
File
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line
1298, in _prepare_and_execute_task_with_callbacks
registered = task_copy.register_in_sensor_service(self, context)
File "/usr/local/lib/python3.7/site-packages/airflow/sensors/base.py",
line 163, in register_in_sensor_service
return SensorInstance.register(ti, poke_context, execution_context)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py",
line 70, in wrapper
return func(*args, session=session, **kwargs)
File
"/usr/local/lib/python3.7/site-packages/airflow/models/sensorinstance.py", line
114, in register
encoded_poke = json.dumps(poke_context)
File "/usr/lib64/python3.7/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/usr/lib64/python3.7/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/lib64/python3.7/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/lib64/python3.7/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type function is not JSON serializable
[2021-08-25 16:41:57,726] {{python.py:72}} INFO - Poking callable: <function
_sla_check at 0x7f57ee835f80>
```
<!-- Please include exact error messages if you can -->
**What you expected to happen**: I expected a Python Sensor to be capable of
being cast as a Smart Sensor with the appropriate legwork, similar to working
with the Amazon Airflow Sensors being capable of being able to be classed as
smart sensors.
<!-- What do you think went wrong? -->
**How to reproduce it**:
1. Set up Apache Airflow environment configuration to support a
`SmartPythonSensor` with `poke_context_fields` of `("python_callable",
"op_kwargs")`.
2. Create a child class, `SmartPythonSensor` that inherits from
`PythonSensor` registered to be smart sensor compatible.
3. Write a test DAG with a `SmartPythonSensor` task with a simple python
callable.
4. Watch the logs as the test DAG complains the `SmartPythonSensor` cannot
be registered with the sensor service because the python callable the
`SmartPythonSensor` relies upon cannot be JSON serialized.
<!--
As minimally and precisely as possible. Keep in mind we do not have access
to your cluster or dags.
If this is a UI bug, please provide a screenshot of the bug or a link to a
youtube video of the bug in action
You can include images/screen-casts etc. by drag-dropping the image here.
-->
**Anything else we need to know**:
Downgrading a registered smart sensor to a regular sensor task will occur
every time a DAG with a `SmartPythonSensor` runs due to the `SensorInstance`
not being able to serialize a python callable.
My `SmartPythonSensor` class:
```
from airflow.sensors.python import PythonSensor
from airflow.utils.decorators import apply_defaults
class SmartPythonSensor(PythonSensor):
poke_context_fields = ("python_callable", "op_kwargs")
@apply_defaults
def __init__(self, **kwargs):
super().__init__(**kwargs)
def is_smart_sensor_compatible(self):
# Smart sensor cannot have on success callback
self.on_success_callback = None
# Smart sensor cannot have on retry callback
self.on_retry_callback = None
# Smart sensor cannot have on failure callback
self.on_failure_callback = None
# Smart sensor cannot mark the task as SKIPPED on failure
self.soft_fail = False
return super().is_smart_sensor_compatible()
```
<!--
How often does this problem occur? Once? Every time etc?
Any relevant logs to include? Put them here inside fenced
``` ``` blocks or inside a foldable details tag if it's long:
<details><summary>x.log</summary> lots of stuff </details>
-->
**Are you willing to submit a PR?**
<!---
This is absolutely not required, but we are happy to guide you in
contribution process
especially if you already have a good understanding of how to implement the
fix.
Airflow is a community-managed project and we love to bring new contributors
in.
Find us in #airflow-how-to-pr on Slack!
-->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]