shahar1 commented on code in PR #61290:
URL: https://github.com/apache/airflow/pull/61290#discussion_r2751635049
##########
providers/google/tests/system/google/cloud/datafusion/example_datafusion.py:
##########
@@ -271,6 +271,18 @@ def get_artifacts_versions(ti=None):
)
# [END howto_cloud_data_fusion_start_pipeline_def]
+ # [START howto_cloud_data_fusion_start_pipeline_def_sensor]
+ start_pipeline_def_sensor = CloudDataFusionPipelineStateSensor(
+ task_id="pipeline_state_sensor_def",
+ pipeline_name=PIPELINE_NAME,
+ pipeline_id=start_pipeline_def.output,
+ expected_statuses=["COMPLETED"],
+ failure_statuses=["FAILED"],
+ instance_name=INSTANCE_NAME,
+ location=LOCATION,
+ )
+ # [END howto_cloud_data_fusion_start_pipeline_def_sensor]
Review Comment:
Is the sensor necessary? If it is, I'd remove
`howto_cloud_data_fusion_start_pipeline_def_sensor` for now (you could create
another PR for adding it + update the sensor's docs). If not, please remove it.
##########
providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py:
##########
@@ -532,23 +532,32 @@ def stop_pipeline(
instance_url: str,
namespace: str = "default",
pipeline_type: DataFusionPipelineType = DataFusionPipelineType.BATCH,
+ run_id: str | None = None,
) -> None:
"""
Stop a Cloud Data Fusion pipeline. Works for both batch and stream
pipelines.
:param pipeline_name: Your pipeline name.
:param instance_url: Endpoint on which the REST APIs is accessible for
the instance.
- :param namespace: f your pipeline belongs to a Basic edition instance,
the namespace ID
+ :param pipeline_type: Can be either BATCH or STREAM.
+ :param run_id: The specific run_id to stop execution if available;
when absent it will stop all runs under pipeline_name.
+ :param namespace: If your pipeline belongs to a Basic edition
instance, the namespace ID
is always default. If your pipeline belongs to an Enterprise
edition instance, you
can create a namespace.
"""
- url = os.path.join(
+ base_stop_url = os.path.join(
self._base_url(instance_url, namespace),
quote(pipeline_name),
self.cdap_program_type(pipeline_type=pipeline_type),
self.cdap_program_id(pipeline_type=pipeline_type),
- "stop",
)
+
+ if run_id:
+ url = os.path.join(base_stop_url, "runs", quote(str(run_id)),
"stop")
+
Review Comment:
nit: remove empty line
##########
providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py:
##########
@@ -532,23 +532,32 @@ def stop_pipeline(
instance_url: str,
namespace: str = "default",
pipeline_type: DataFusionPipelineType = DataFusionPipelineType.BATCH,
+ run_id: str | None = None,
) -> None:
"""
Stop a Cloud Data Fusion pipeline. Works for both batch and stream
pipelines.
:param pipeline_name: Your pipeline name.
:param instance_url: Endpoint on which the REST APIs is accessible for
the instance.
- :param namespace: f your pipeline belongs to a Basic edition instance,
the namespace ID
+ :param pipeline_type: Can be either BATCH or STREAM.
+ :param run_id: The specific run_id to stop execution if available;
when absent it will stop all runs under pipeline_name.
+ :param namespace: If your pipeline belongs to a Basic edition
instance, the namespace ID
Review Comment:
Could you please reorder docstring according to the method's signature?
##########
providers/google/src/airflow/providers/google/cloud/operators/datafusion.py:
##########
@@ -901,13 +901,15 @@ class
CloudDataFusionStopPipelineOperator(GoogleCloudBaseOperator):
:ref:`howto/operator:CloudDataFusionStopPipelineOperator`
:param pipeline_name: Your pipeline name.
+ :param pipeline_type: Nature of Pipeline, by default BATCH (workflows).
:param instance_name: The name of the instance.
:param location: The Cloud Data Fusion location in which to handle the
request.
:param namespace: If your pipeline belongs to a Basic edition instance,
the namespace ID
is always default. If your pipeline belongs to an Enterprise edition
instance, you
can create a namespace.
:param api_version: The version of the api that will be requested for
example 'v3'.
:param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param run_id: To stop a particular run tagged with provided run_id.
Review Comment:
nit: Could you please make the description consistent with the hook? (just
copy and paste)
##########
providers/google/tests/system/google/cloud/datafusion/example_datafusion.py:
##########
@@ -61,19 +61,19 @@
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
Review Comment:
Please revert changes in this file that are not absolute must for running
the it (namings, resources, etc.).
I assume that you did it to run locally, but I want to reduce chances for
failing when they automatically run on Google team's project.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]