This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 26768d9408 Update example_cloud_run.py to show job resource limit
setting (#40456)
26768d9408 is described below
commit 26768d94085e8fb6ab188be01f860f18278293cc
Author: Eugene <[email protected]>
AuthorDate: Fri Jun 28 06:43:26 2024 +0000
Update example_cloud_run.py to show job resource limit setting (#40456)
---
.../operators/cloud/cloud_run.rst | 15 +++-
.../google/cloud/cloud_run/example_cloud_run.py | 94 ++++++++++++++++++----
2 files changed, 90 insertions(+), 19 deletions(-)
diff --git a/docs/apache-airflow-providers-google/operators/cloud/cloud_run.rst
b/docs/apache-airflow-providers-google/operators/cloud/cloud_run.rst
index cf90afde68..aedd807f27 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/cloud_run.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/cloud_run.rst
@@ -33,16 +33,23 @@ Create a job
Before you create a job in Cloud Run, you need to define it.
For more information about the Job object fields, visit `Google Cloud Run Job
description
<https://cloud.google.com/run/docs/reference/rpc/google.cloud.run.v2#google.cloud.run.v2.Job>`__
-A simple job configuration can look as follows:
+A simple job configuration can be created with a Job object:
.. exampleinclude::
/../../tests/system/providers/google/cloud/cloud_run/example_cloud_run.py
:language: python
:dedent: 0
- :start-after: [START howto_operator_cloud_run_job_creation]
- :end-before: [END howto_operator_cloud_run_job_creation]
+ :start-after: [START howto_cloud_run_job_instance_creation]
+ :end-before: [END howto_cloud_run_job_instance_creation]
+or with a Python dictionary:
-With this configuration we can create the job:
+.. exampleinclude::
/../../tests/system/providers/google/cloud/cloud_run/example_cloud_run.py
+ :language: python
+ :dedent: 0
+ :start-after: [START howto_cloud_run_job_dict_creation]
+ :end-before: [END howto_cloud_run_job_dict_creation]
+
+You can create a Cloud Run Job with any of these configurations :
:class:`~airflow.providers.google.cloud.operators.cloud_run.CloudRunCreateJobOperator`
.. exampleinclude::
/../../tests/system/providers/google/cloud/cloud_run/example_cloud_run.py
diff --git a/tests/system/providers/google/cloud/cloud_run/example_cloud_run.py
b/tests/system/providers/google/cloud/cloud_run/example_cloud_run.py
index 3b62e2ade9..5a82cca2a7 100644
--- a/tests/system/providers/google/cloud/cloud_run/example_cloud_run.py
+++ b/tests/system/providers/google/cloud/cloud_run/example_cloud_run.py
@@ -15,9 +15,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""
-Example Airflow DAG that uses Google Cloud Run Operators.
-"""
+
+"""Example Airflow DAG that uses Google Cloud Run Operators."""
from __future__ import annotations
@@ -39,7 +38,7 @@ from airflow.providers.google.cloud.operators.cloud_run
import (
from airflow.utils.trigger_rule import TriggerRule
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
-DAG_ID = "example_cloud_run"
+DAG_ID = "cloud_run"
region = "us-central1"
job_name_prefix = "cloudrun-system-test-job"
@@ -117,24 +116,89 @@ def _assert_jobs(ti):
def _assert_one_job(ti):
job_dicts = ti.xcom_pull(task_ids=[list_jobs_limit_task_name],
key="return_value")
-
assert len(job_dicts[0]) == 1
-# [START howto_operator_cloud_run_job_creation]
-def _create_job():
+# [START howto_cloud_run_job_instance_creation]
+def _create_job_instance() -> Job:
+ """
+ Create a Cloud Run job configuration with google.cloud.run_v2.Job object.
+
+ As a minimum the configuration must contain a container image name in its
template.
+ The rest of the configuration parameters are optional and will be
populated with default values if not set.
+ """
job = Job()
container = k8s_min.Container()
container.image = "us-docker.pkg.dev/cloudrun/container/job:latest"
+ container.resources.limits = {"cpu": "2", "memory": "1Gi"}
job.template.template.containers.append(container)
return job
-# [END howto_operator_cloud_run_job_creation]
+# [END howto_cloud_run_job_instance_creation]
+
+
+# [START howto_cloud_run_job_dict_creation]
+def _create_job_dict() -> dict:
+ """
+ Create a Cloud Run job configuration with a Python dict.
+
+ As a minimum the configuration must contain a container image name in its
template.
+ """
+ return {
+ "template": {
+ "template": {
+ "containers": [
+ {
+ "image":
"us-docker.pkg.dev/cloudrun/container/job:latest",
+ "resources": {
+ "limits": {"cpu": "1", "memory": "512Mi"},
+ "cpu_idle": False,
+ "startup_cpu_boost": False,
+ },
+ "name": "",
+ "command": [],
+ "args": [],
+ "env": [],
+ "ports": [],
+ "volume_mounts": [],
+ "working_dir": "",
+ "depends_on": [],
+ }
+ ],
+ "volumes": [],
+ "execution_environment": 0,
+ "encryption_key": "",
+ },
+ "labels": {},
+ "annotations": {},
+ "parallelism": 0,
+ "task_count": 0,
+ },
+ "name": "",
+ "uid": "",
+ "generation": "0",
+ "labels": {},
+ "annotations": {},
+ "creator": "",
+ "last_modifier": "",
+ "client": "",
+ "client_version": "",
+ "launch_stage": 0,
+ "observed_generation": "0",
+ "conditions": [],
+ "execution_count": 0,
+ "reconciling": False,
+ "satisfies_pzs": False,
+ "etag": "",
+ }
+
+
+# [END howto_cloud_run_job_dict_creation]
-def _create_job_with_label():
- job = _create_job()
+def _create_job_instance_with_label():
+ job = _create_job_instance()
job.labels = {"somelabel": "label1"}
return job
@@ -144,7 +208,7 @@ with DAG(
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example"],
+ tags=["example", "cloud", "run"],
) as dag:
# [START howto_operator_cloud_run_create_job]
create1 = CloudRunCreateJobOperator(
@@ -152,7 +216,7 @@ with DAG(
project_id=PROJECT_ID,
region=region,
job_name=job1_name,
- job=_create_job(),
+ job=_create_job_instance(),
dag=dag,
)
# [END howto_operator_cloud_run_create_job]
@@ -162,7 +226,7 @@ with DAG(
project_id=PROJECT_ID,
region=region,
job_name=job2_name,
- job=Job.to_dict(_create_job()),
+ job=_create_job_dict(),
dag=dag,
)
@@ -171,7 +235,7 @@ with DAG(
project_id=PROJECT_ID,
region=region,
job_name=job3_name,
- job=Job.to_dict(_create_job()),
+ job=Job.to_dict(_create_job_instance()),
dag=dag,
)
@@ -249,7 +313,7 @@ with DAG(
project_id=PROJECT_ID,
region=region,
job_name=job1_name,
- job=_create_job_with_label(),
+ job=_create_job_instance_with_label(),
dag=dag,
)
# [END howto_operator_cloud_update_job]