ibzib commented on a change in pull request #11726:
URL: https://github.com/apache/airflow/pull/11726#discussion_r516185504
##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow.py
##########
@@ -127,6 +130,40 @@
py_interpreter='python3',
py_system_site_packages=False,
)
+ # [END howto_operator_start_python_job]
+
+
+with models.DAG(
+ "example_gcp_dataflow_native_python_async",
+ default_args=default_args,
+ start_date=days_ago(1),
+ schedule_interval=None, # Override to match your needs
Review comment:
Should we link to some documentation for `schedule_interval`?
##########
File path: airflow/providers/google/cloud/example_dags/example_dataflow.py
##########
@@ -127,6 +130,40 @@
py_interpreter='python3',
py_system_site_packages=False,
)
+ # [END howto_operator_start_python_job]
+
+
+with models.DAG(
+ "example_gcp_dataflow_native_python_async",
+ default_args=default_args,
+ start_date=days_ago(1),
+ schedule_interval=None, # Override to match your needs
+ tags=['example'],
+) as dag_native_python_async:
+ start_python_job_async = DataflowCreatePythonJobOperator(
+ task_id="start-python-job-async",
+ py_file=GCS_PYTHON,
+ py_options=[],
+ job_name='{{task.task_id}}',
+ options={
+ 'output': GCS_OUTPUT,
+ },
+ py_requirements=['apache-beam[gcp]==2.21.0'],
Review comment:
Why Beam 2.21.0?
##########
File path: airflow/providers/google/cloud/sensors/dataflow.py
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Bigquery sensor."""
+from typing import Optional, Sequence, Union, Set
+
+from airflow.exceptions import AirflowException
+
+from airflow.providers.google.cloud.hooks.dataflow import (
+ DataflowHook,
+ DEFAULT_DATAFLOW_LOCATION,
+ DataflowJobStatus,
+)
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataflowJobStatusSensor(BaseSensorOperator):
+ """
+ Checks for the status of a job in Google Dataflow.
+
+ :param job_id: ID of the job to be checked.
+ :type job_id: str
+ :param expected_statuses: The expected state of the operation.
+ See:
+
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
+ :type expected_statuses: Union[Set[str], str]
+ :param project_id: Optional, the Google Cloud project ID in which to start
a job.
+ If set to None or missing, the default project_id from the Google
Cloud connection is used.
+ :type project_id: str
+ :param location: Job location.
+ :type location: str
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :type gcp_conn_id: str
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
Review comment:
Links to GCP documentation for these fields would also be useful.
##########
File path: airflow/providers/google/cloud/sensors/dataflow.py
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Bigquery sensor."""
+from typing import Optional, Sequence, Union, Set
+
+from airflow.exceptions import AirflowException
+
+from airflow.providers.google.cloud.hooks.dataflow import (
+ DataflowHook,
+ DEFAULT_DATAFLOW_LOCATION,
+ DataflowJobStatus,
+)
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataflowJobStatusSensor(BaseSensorOperator):
+ """
+ Checks for the status of a job in Google Dataflow.
+
+ :param job_id: ID of the job to be checked.
+ :type job_id: str
+ :param expected_statuses: The expected state of the operation.
+ See:
+
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
+ :type expected_statuses: Union[Set[str], str]
+ :param project_id: Optional, the Google Cloud project ID in which to start
a job.
+ If set to None or missing, the default project_id from the Google
Cloud connection is used.
+ :type project_id: str
+ :param location: Job location.
Review comment:
ditto
##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -147,9 +147,16 @@ class _DataflowJobsController(LoggingMixin):
not by specific job ID, then actions will be performed on all matching
jobs.
:param drain_pipeline: Optional, set to True if want to stop streaming job
by draining it
instead of canceling.
+ :param wait_until_finished: If True, wait for the end of pipeline
execution before exiting. If False,
+ it only waits for it to starts (``JOB_STATE_RUNNING``).
+
+ The default behavior depends on the type of pipeline:
Review comment:
This seems reasonable.
##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -266,16 +283,18 @@ def _check_dataflow_job_state(self, job) -> bool:
:rtype: bool
:raise: Exception
"""
+ if self._wait_until_finished is None:
+ wait_until_finished = DataflowJobType.JOB_TYPE_STREAMING !=
job['type']
Review comment:
Nit: avoid [Yoda
conditions](https://en.wikipedia.org/wiki/Yoda_conditions).
##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -324,6 +344,23 @@ class DataflowTemplatedJobStartOperator(BaseOperator):
`https://cloud.google.com/dataflow/pipelines/specifying-exec-params
<https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment>`__
:type environment: Optional[dict]
+ :param wait_until_finished: (Optional)
+ If True, wait for the end of pipeline execution before exiting. If
False,
+ it only waits for it to starts (``JOB_STATE_RUNNING``).
+
+ The default behavior depends on the type of pipeline:
+
+ * for the streaming pipeline, wait for jobs to start,
+ * for the batch pipeline, wait for the jobs to complete.
+
+ .. warning::
+
+ You cannot call ``PipelineResult.wait_until_finish`` method in
your pipeline code for the operator
+ to work properly. i. e. you must use asynchronous execution.
Otherwise, your pipeline will
+ always wait until finished. For more information, look at:
Review comment:
By "always wait until finished," do you mean it will block forever?
Why does this happen?
##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -920,3 +942,30 @@ def cancel_job(
drain_pipeline=self.drain_pipeline,
)
jobs_controller.cancel()
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def get_job(
+ self,
+ job_id: str,
+ project_id: str,
+ location: str = DEFAULT_DATAFLOW_LOCATION,
+ ) -> dict:
+ """
+ Gets the job with the specified Job ID.
+
+ :param job_id: Job ID to get.
+ :type job_id: str
+ :param project_id: Optional, the Google Cloud project ID in which to
start a job.
+ If set to None or missing, the default project_id from the Google
Cloud connection is used.
+ :type project_id:
+ :param location: Job location.
Review comment:
Clarify what "location" means (it's the regional endpoint) and provide
an example value (`us-central1`). Add a link to
https://cloud.google.com/dataflow/docs/concepts/regional-endpoints.
##########
File path: airflow/providers/google/cloud/sensors/dataflow.py
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Bigquery sensor."""
+from typing import Optional, Sequence, Union, Set
+
+from airflow.exceptions import AirflowException
+
+from airflow.providers.google.cloud.hooks.dataflow import (
+ DataflowHook,
+ DEFAULT_DATAFLOW_LOCATION,
+ DataflowJobStatus,
+)
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class DataflowJobStatusSensor(BaseSensorOperator):
+ """
+ Checks for the status of a job in Google Dataflow.
Review comment:
Nit: branding
```suggestion
Checks for the status of a job in Google Cloud Dataflow.
```
##########
File path: airflow/providers/google/cloud/sensors/dataflow.py
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Google Bigquery sensor."""
Review comment:
```suggestion
"""This module contains a Google Cloud Dataflow sensor."""
```
##########
File path: airflow/providers/google/cloud/operators/dataflow.py
##########
@@ -324,6 +344,23 @@ class DataflowTemplatedJobStartOperator(BaseOperator):
`https://cloud.google.com/dataflow/pipelines/specifying-exec-params
<https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment>`__
:type environment: Optional[dict]
+ :param wait_until_finished: (Optional)
+ If True, wait for the end of pipeline execution before exiting. If
False,
+ it only waits for it to starts (``JOB_STATE_RUNNING``).
+
+ The default behavior depends on the type of pipeline:
+
+ * for the streaming pipeline, wait for jobs to start,
+ * for the batch pipeline, wait for the jobs to complete.
+
+ .. warning::
+
+ You cannot call ``PipelineResult.wait_until_finish`` method in
your pipeline code for the operator
Review comment:
Note that `wait_until_finish` is called implicitly by `with Pipeline()
as p:` as well. Since `with` is the recommended way to run pipelines, I am
hesitant about this part. Especially since the user is likely to have not even
written the template's pipeline code themselves.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]