SameerMesiah97 commented on code in PR #66625:
URL: https://github.com/apache/airflow/pull/66625#discussion_r3213772623
##########
providers/google/tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py:
##########
@@ -246,6 +246,59 @@ def test_list_pipeline_jobs(self, mock_client) -> None:
)
mock_client.return_value.common_location_path.assert_called_once_with(TEST_PROJECT_ID,
TEST_REGION)
+ # ----- Regression: reserved_ip_ranges plumbing (#62733)
---------------------
+ # These tests exercise the optional reserved_ip_ranges parameter end-to-end
+ # through both submit_pipeline_job and run_pipeline_job, to keep the kwarg
+ # forwarded to PipelineJob.submit() in lockstep with the operator surface.
+ # We assert by-keyword to make sure the value lands on the SDK boundary
+ # rather than being silently swallowed (the bug behavior on origin/main).
+
+
@mock.patch(PIPELINE_JOB_STRING.format("PipelineJobHook.get_pipeline_job_object"))
+ def test_submit_pipeline_job_forwards_reserved_ip_ranges(self,
mock_get_job) -> None:
+ mock_job = mock_get_job.return_value
+ self.hook.submit_pipeline_job(
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ display_name="display",
+ template_path="gs://bucket/template.json",
+ reserved_ip_ranges=["range-1", "range-2"],
+ )
+ mock_job.submit.assert_called_once()
+ _, submit_kwargs = mock_job.submit.call_args
+ assert submit_kwargs.get("reserved_ip_ranges") == ["range-1",
"range-2"]
+
+
@mock.patch(PIPELINE_JOB_STRING.format("PipelineJobHook.get_pipeline_job_object"))
+ def test_submit_pipeline_job_omits_reserved_ip_ranges_when_unset(self,
mock_get_job) -> None:
+ # When unset, the kwarg must not appear in the submit() call so
+ # users on older google-cloud-aiplatform releases (which would reject
+ # unknown kwargs) keep working.
+ mock_job = mock_get_job.return_value
+ self.hook.submit_pipeline_job(
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ display_name="display",
+ template_path="gs://bucket/template.json",
+ )
+ mock_job.submit.assert_called_once()
+ _, submit_kwargs = mock_job.submit.call_args
+ assert "reserved_ip_ranges" not in submit_kwargs
+
+
@mock.patch(PIPELINE_JOB_STRING.format("PipelineJobHook.get_pipeline_job_object"))
+ def test_run_pipeline_job_forwards_reserved_ip_ranges(self, mock_get_job)
-> None:
+ mock_job = mock_get_job.return_value
+ self.hook.run_pipeline_job(
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ display_name="display",
+ template_path="gs://bucket/template.json",
+ reserved_ip_ranges=["range-1"],
+ )
+ mock_job.submit.assert_called_once()
+ _, submit_kwargs = mock_job.submit.call_args
+ assert submit_kwargs.get("reserved_ip_ranges") == ["range-1"]
+ # run_pipeline_job is the synchronous variant: it must also block on
wait()
+ mock_job.wait.assert_called_once()
+
Review Comment:
I would add a test for empty `reserved_ip_ranges`. Please refer to the below
test for guidance:
```
@mock.patch(PIPELINE_JOB_STRING.format("PipelineJobHook.get_pipeline_job_object"))
def test_submit_pipeline_job_forwards_empty_reserved_ip_ranges(self,
mock_get_job) -> None:
mock_job = mock_get_job.return_value
self.hook.submit_pipeline_job(
project_id=TEST_PROJECT_ID,
region=TEST_REGION,
display_name="display",
template_path="gs://bucket/template.json",
reserved_ip_ranges=[],
)
mock_job.submit.assert_called_once()
_, submit_kwargs = mock_job.submit.call_args
assert submit_kwargs["reserved_ip_ranges"] == []
```
##########
providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/pipeline_job.py:
##########
Review Comment:
Looks like you will need add the check you added above here too:
```
submit_kwargs = {
"project_id": self.project_id,
"region": self.region,
"display_name": self.display_name,
"template_path": self.template_path,
"job_id": self.job_id,
"pipeline_root": self.pipeline_root,
"parameter_values": self.parameter_values,
"input_artifacts": self.input_artifacts,
"enable_caching": self.enable_caching,
"encryption_spec_key_name": self.encryption_spec_key_name,
"labels": self.labels,
"failure_policy": self.failure_policy,
"service_account": self.service_account,
"network": self.network,
"create_request_timeout": self.create_request_timeout,
"experiment": self.experiment,
}
if self.reserved_ip_ranges is not None:
submit_kwargs["reserved_ip_ranges"] = self.reserved_ip_ranges
```
Right now, it is always passing `None` for `reserved_ip_ranges` which is
causing `TestVertexAIRunPipelineJobOperator::test_execute` to fail.
##########
providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/pipeline_job.py:
##########
@@ -92,6 +92,10 @@ class RunPipelineJobOperator(GoogleCloudBaseOperator):
Metrics produced by the PipelineJob as system.Metric Artifacts will be
associated as metrics
to the current Experiment Run. Pipeline parameters will be associated
as parameters to
the current Experiment Run.
+ :param reserved_ip_ranges: Optional. A list of names for the reserved IP
ranges under the VPC network
+ that can be used for this PipelineJob. If set, only IP addresses from
these reserved ranges will
+ be used; otherwise, all IPs in the VPC are eligible. Requires
``network`` to be configured.
+ See
https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.PipelineJob#google_cloud_aiplatform_PipelineJob_submit
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
Review Comment:
The link could be removed to make it consistent with the other docstrings
for the same parameter.
##########
providers/google/tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py:
##########
@@ -246,6 +246,59 @@ def test_list_pipeline_jobs(self, mock_client) -> None:
)
mock_client.return_value.common_location_path.assert_called_once_with(TEST_PROJECT_ID,
TEST_REGION)
+ # ----- Regression: reserved_ip_ranges plumbing (#62733)
---------------------
+ # These tests exercise the optional reserved_ip_ranges parameter end-to-end
+ # through both submit_pipeline_job and run_pipeline_job, to keep the kwarg
+ # forwarded to PipelineJob.submit() in lockstep with the operator surface.
+ # We assert by-keyword to make sure the value lands on the SDK boundary
+ # rather than being silently swallowed (the bug behavior on origin/main).
Review Comment:
I think this would be better communicated as shorter comments and/or
docstrings on the individual tests. Right now the large block reads more like
PR rationale than test-specific context.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]