jeffvswanson opened a new issue #17834:
URL: https://github.com/apache/airflow/issues/17834


   <!--
   Welcome to Apache Airflow!
   
   Please complete the next sections or the issue will be closed.
   -->
   
   **Apache Airflow version**: 2.02
   
   <!-- AIRFLOW VERSION IS MANDATORY -->
   
   **OS**: Amazon Linux AMI
   
   <!-- MANDATORY! You can get it via `cat /etc/oss-release` for example -->
   
   **Apache Airflow Provider versions**: apache-airflow-providers-amazon, v2.1.0
   
   <!-- You can use `pip freeze | grep apache-airflow-providers` (you can leave 
only relevant ones)-->
   
   **Deployment**: AWS MWAA
   
   <!-- e.g. Virtualenv / VM / Docker-compose / K8S / Helm Chart / Managed 
Airflow Service -->
   
   <!-- Please include your deployment tools and versions: docker-compose, k8s, 
helm, etc -->
   
   **What happened**: Setting up a Smart Python Sensor in AWS MWAA always 
reverts to a normal Python Sensor task. This is because the Sensor Instance 
cannot JSON serialize the python callable function.
   
   Error message
   ```
   [2021-08-25 16:41:57,686] {{taskinstance.py:1301}} WARNING - Failed to 
register in sensor service.Continue to run task in non smart sensor mode.
   [2021-08-25 16:41:57,705] {{taskinstance.py:1303}} ERROR - Object of type 
function is not JSON serializable
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 
1298, in _prepare_and_execute_task_with_callbacks
       registered = task_copy.register_in_sensor_service(self, context)
     File "/usr/local/lib/python3.7/site-packages/airflow/sensors/base.py", 
line 163, in register_in_sensor_service
       return SensorInstance.register(ti, poke_context, execution_context)
     File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", 
line 70, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/models/sensorinstance.py", line 
114, in register
       encoded_poke = json.dumps(poke_context)
     File "/usr/lib64/python3.7/json/__init__.py", line 231, in dumps
       return _default_encoder.encode(obj)
     File "/usr/lib64/python3.7/json/encoder.py", line 199, in encode
       chunks = self.iterencode(o, _one_shot=True)
     File "/usr/lib64/python3.7/json/encoder.py", line 257, in iterencode
       return _iterencode(o, 0)
     File "/usr/lib64/python3.7/json/encoder.py", line 179, in default
       raise TypeError(f'Object of type {o.__class__.__name__} '
   TypeError: Object of type function is not JSON serializable
   [2021-08-25 16:41:57,726] {{python.py:72}} INFO - Poking callable: <function 
_sla_check at 0x7f57ee835f80>
   ```
   <!-- Please include exact error messages if you can -->
   
   **What you expected to happen**: I expected a Python Sensor to be capable of 
being cast as a Smart Sensor with the appropriate legwork, similar to working 
with the Amazon Airflow Sensors being capable of being able to be classed as 
smart sensors.
   
   <!-- What do you think went wrong? -->
   
   **How to reproduce it**:
   1. Set up Apache Airflow environment configuration to support a 
`SmartPythonSensor` with `poke_context_fields` of `("python_callable", 
"op_kwargs")`.
   2. Create a child class, `SmartPythonSensor` that inherits from 
`PythonSensor` registered to be smart sensor compatible.
   3. Write a test DAG with a `SmartPythonSensor` task with a simple python 
callable.
   4. Watch the logs as the test DAG complains the `SmartPythonSensor` cannot 
be registered with the sensor service because the python callable the 
`SmartPythonSensor` relies upon cannot be JSON serialized.
   
   <!--
   As minimally and precisely as possible. Keep in mind we do not have access 
to your cluster or dags.
   If this is a UI bug, please provide a screenshot of the bug or a link to a 
youtube video of the bug in action
   You can include images/screen-casts etc. by drag-dropping the image here.
   -->
   
   **Anything else we need to know**:
   Downgrading a registered smart sensor to a regular sensor task will occur 
every time a DAG with a `SmartPythonSensor` runs due to the `SensorInstance` 
not being able to serialize a python callable.
   
   My `SmartPythonSensor` class:
   ```
   from airflow.sensors.python import PythonSensor
   from airflow.utils.decorators import apply_defaults
   
   
   class SmartPythonSensor(PythonSensor):
       poke_context_fields = ("python_callable", "op_kwargs")
   
       @apply_defaults
       def __init__(self, **kwargs):
           super().__init__(**kwargs)
   
       def is_smart_sensor_compatible(self):
           # Smart sensor cannot have on success callback
           self.on_success_callback = None
   
           # Smart sensor cannot have on retry callback
           self.on_retry_callback = None
   
           # Smart sensor cannot have on failure callback
           self.on_failure_callback = None
   
           # Smart sensor cannot mark the task as SKIPPED on failure
           self.soft_fail = False
           return super().is_smart_sensor_compatible()
   ```
   <!--
   How often does this problem occur? Once? Every time etc?
   Any relevant logs to include? Put them here inside fenced
   ``` ``` blocks or inside a foldable details tag if it's long:
   <details><summary>x.log</summary> lots of stuff </details>
   -->
   
   **Are you willing to submit a PR?**
   
   <!---
   This is absolutely not required, but we are happy to guide you in 
contribution process
   especially if you already have a good understanding of how to implement the 
fix.
   Airflow is a community-managed project and we love to bring new contributors 
in.
   Find us in #airflow-how-to-pr on Slack!
    -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to