This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bcb026bf7c use the proper key to retrieve the dataflow job_id (#27336)
bcb026bf7c is described below
commit bcb026bf7c8031ff64c8b6019d248b12d6aa71e0
Author: Deji Ibrahim <[email protected]>
AuthorDate: Mon Oct 31 05:54:59 2022 +0100
use the proper key to retrieve the dataflow job_id (#27336)
---
airflow/providers/google/cloud/example_dags/example_dataflow.py | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/airflow/providers/google/cloud/example_dags/example_dataflow.py
b/airflow/providers/google/cloud/example_dags/example_dataflow.py
index 95fd2e4f04..1364de183e 100644
--- a/airflow/providers/google/cloud/example_dags/example_dataflow.py
+++ b/airflow/providers/google/cloud/example_dags/example_dataflow.py
@@ -175,7 +175,7 @@ with models.DAG(
# [START howto_sensor_wait_for_job_status]
wait_for_python_job_async_done = DataflowJobStatusSensor(
task_id="wait-for-python-job-async-done",
-
job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",
+ job_id="{{task_instance.xcom_pull('start-python-job-async')['id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
location="europe-west3",
)
@@ -199,7 +199,7 @@ with models.DAG(
wait_for_python_job_async_metric = DataflowJobMetricsSensor(
task_id="wait-for-python-job-async-metric",
-
job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",
+ job_id="{{task_instance.xcom_pull('start-python-job-async')['id']}}",
location="europe-west3",
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds",
value=100),
fail_on_terminal_state=False,
@@ -216,7 +216,7 @@ with models.DAG(
wait_for_python_job_async_message = DataflowJobMessagesSensor(
task_id="wait-for-python-job-async-message",
-
job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",
+ job_id="{{task_instance.xcom_pull('start-python-job-async')['id']}}",
location="europe-west3",
callback=check_message,
fail_on_terminal_state=False,
@@ -233,7 +233,7 @@ with models.DAG(
wait_for_python_job_async_autoscaling_event =
DataflowJobAutoScalingEventsSensor(
task_id="wait-for-python-job-async-autoscaling-event",
-
job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",
+ job_id="{{task_instance.xcom_pull('start-python-job-async')['id']}}",
location="europe-west3",
callback=check_autoscaling_event,
fail_on_terminal_state=False,