josh-fell commented on code in PR #29143:
URL: https://github.com/apache/airflow/pull/29143#discussion_r1086725027
##########
docs/apache-airflow/howto/operator/python.rst:
##########
@@ -225,11 +225,29 @@ Jinja templating can be used in same way as described for
the PythonOperator.
PythonSensor
============
-Use the :class:`~airflow.sensors.python.PythonSensor` to use arbitrary
callable for sensing. The callable
-should return True when it succeeds, False otherwise.
+A PythonSensor waits for a certain condition to be ``True``, for example to
wait for a file to exist. The
+PythonSensor is available via ``@task.sensor`` and
``airflow.sensors.python.PythonSensor``. The callable
+should return a boolean ``True`` or ``False``, indicating whether a condition
is met. For example:
-.. exampleinclude:: /../../airflow/example_dags/example_sensors.py
- :language: python
- :dedent: 4
- :start-after: [START example_python_sensors]
- :end-before: [END example_python_sensors]
+.. code-block:: python
+
+ import datetime
+
+ from airflow.decorators import dag, task
+ from airflow.sensors.python import PythonSensor
+
+
+ @dag(start_date=datetime.datetime(2023, 1, 1), schedule=None)
+ def example():
+ @task.sensor
+ def wait_for_success():
+ return datetime.datetime.now().minute % 2 == 0
+
+ wait_for_success()
+ PythonSensor(task_id="wait_for_even_minute",
python_callable=wait_for_success)
Review Comment:
Yeah the `PythonSensor` task succeeds but the callable is not actually
called.
```python
import datetime
from airflow.decorators import dag, task
from airflow.sensors.python import PythonSensor
@dag(start_date=datetime.datetime(2023, 1, 1), schedule=None)
def example():
@task.sensor
def wait_for_success():
print("Poking.")
return datetime.datetime.now().minute % 2 == 0
wait_for_success()
PythonSensor(task_id="wait_for_even_minute",
python_callable=wait_for_success)
example()
```
Looking at the task logs for both tasks, only the `@task.sensor`-decorated
task executes the `wait_for_success()` function. The DAG run was triggered on
an odd minute as well:
**wait_for_success**
<img width="545" alt="image"
src="https://user-images.githubusercontent.com/48934154/214590213-53a9382c-82bf-42da-96ad-84e076361566.png">
**wait_for_even_minute**
<img width="524" alt="image"
src="https://user-images.githubusercontent.com/48934154/214590426-d18eeb88-4245-4daf-8fa2-24b31c375a2a.png">
You can't reference a TaskFlow function as the `python_callable` for
`PythonOperator`. The `PythonOperator` task fails with "ERROR - Object of type
PlainXComArg is not JSON serializable. If you are using pickle instead of JSON
for XCom, then you need to enable pickle support for XCom in your airflow
config or make sure to decorate your object with attr.". Either case it should
probably throw a more pertinent exception when a TaskFlow function is used as a
`python_callable` in either scenario (and for all decorators I suppose).
+1 for separate functions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]