This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new eab0e5ac43 Add docs for livy deferrable operator (#30397)
eab0e5ac43 is described below
commit eab0e5ac439b3fa8f1f25ca9cb49d1933770c6e8
Author: Phani Kumar <[email protected]>
AuthorDate: Wed Apr 5 13:48:36 2023 +0530
Add docs for livy deferrable operator (#30397)
* Add docs for livy deferrable
* Add docs for livy deferrable
* Apply review suggestions
* Fix example DAG
---
.../operators.rst | 9 +++++++++
tests/system/providers/apache/livy/example_livy.py | 19 +++++++++++++++++++
2 files changed, 28 insertions(+)
diff --git a/docs/apache-airflow-providers-apache-livy/operators.rst
b/docs/apache-airflow-providers-apache-livy/operators.rst
index 09d97bbbdb..5ef0e0b71a 100644
--- a/docs/apache-airflow-providers-apache-livy/operators.rst
+++ b/docs/apache-airflow-providers-apache-livy/operators.rst
@@ -34,6 +34,15 @@ This operator wraps the Apache Livy batch REST API, allowing
to submit a Spark a
:start-after: [START create_livy]
:end-before: [END create_livy]
+You can also run this operator in deferrable mode by setting the parameter
``deferrable`` to True.
+This will lead to efficient utilization of Airflow workers as polling for job
status happens on
+the triggerer asynchronously. Note that this will need triggerer to be
available on your Airflow deployment.
+
+.. exampleinclude:: /../../tests/system/providers/apache/livy/example_livy.py
+ :language: python
+ :start-after: [START create_livy_deferrable]
+ :end-before: [END create_livy_deferrable]
+
Reference
"""""""""
diff --git a/tests/system/providers/apache/livy/example_livy.py
b/tests/system/providers/apache/livy/example_livy.py
index 8b67b720bb..9bc8241f0c 100644
--- a/tests/system/providers/apache/livy/example_livy.py
+++ b/tests/system/providers/apache/livy/example_livy.py
@@ -54,6 +54,25 @@ with DAG(
livy_java_task >> livy_python_task
# [END create_livy]
+ # [START create_livy_deferrable]
+ livy_java_task_deferrable = LivyOperator(
+ task_id="livy_java_task_deferrable",
+ file="/spark-examples.jar",
+ num_executors=1,
+ conf={
+ "spark.shuffle.compress": "false",
+ },
+ class_name="org.apache.spark.examples.SparkPi",
+ deferrable=True,
+ )
+
+ livy_python_task_deferrable = LivyOperator(
+ task_id="livy_python_task_deferrable", file="/pi.py",
polling_interval=60, deferrable=True
+ )
+
+ livy_java_task_deferrable >> livy_python_task_deferrable
+ # [END create_livy_deferrable]
+
from tests.system.utils.watcher import watcher
# This test needs watcher in order to properly mark success/failure