jbbqqf opened a new pull request, #66625:
URL: https://github.com/apache/airflow/pull/66625

   <!--
   Was generative AI tooling used to co-author this PR?
   - [X] Yes — Claude Code (https://claude.com/claude-code), per
     contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions.
     Generated-by: Claude Code following the guidelines.
     All code was reviewed against google-cloud-aiplatform's PipelineJob.submit
     signature and against the existing pattern used by RayClusterOperator
     (which already plumbs `reserved_ip_ranges` end-to-end). Tests pass locally.
   -->
   
   closes: #62733
   
   ## Summary
   
   `PipelineJobHook.run_pipeline_job` and `PipelineJobHook.submit_pipeline_job`
   accept `service_account`, `network`, `create_request_timeout`, and
   `experiment`, but they never forwarded `reserved_ip_ranges` to the
   underlying `PipelineJob.submit()` call — even though
   `google-cloud-aiplatform` supports the kwarg and the Vertex AI Ray cluster
   operator already plumbs it through.
   
   Users running Vertex AI pipelines on a VPC with reserved peering ranges had
   no way to constrain a pipeline to those ranges through
   `RunPipelineJobOperator`.
   
   This PR adds the optional `reserved_ip_ranges: list[str] | None` parameter
   to both hook entry points and to `RunPipelineJobOperator`, and forwards it
   to `PipelineJob.submit()` **only when explicitly set** so that users still
   on an older aiplatform release that doesn't accept the kwarg keep working.
   
   ## Context
   
   - aiplatform reference: 
[`PipelineJob.submit()`](https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.PipelineJob#google_cloud_aiplatform_PipelineJob_submit)
     — `reserved_ip_ranges: Optional[List[str]] = None`.
   - Existing precedent in this provider:
     
`providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/ray.py:189`
     already plumbs `reserved_ip_ranges` through to the Ray cluster create call.
   
   ## Changes
   
   - 
`providers/google/src/airflow/providers/google/cloud/hooks/vertex_ai/pipeline_job.py`
     — add `reserved_ip_ranges` to `run_pipeline_job` and
     `submit_pipeline_job`. The `submit()` call uses a kwargs dict that only
     includes `reserved_ip_ranges` when it's set; a one-line code comment
     documents *why* (older aiplatform versions reject unknown kwargs).
   - 
`providers/google/src/airflow/providers/google/cloud/operators/vertex_ai/pipeline_job.py`
     — add `reserved_ip_ranges` to `RunPipelineJobOperator.__init__` and
     forward it to `submit_pipeline_job(...)`.
   - 
`providers/google/tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py`
     — three regression tests:
     - `test_submit_pipeline_job_forwards_reserved_ip_ranges` — value lands on
       the SDK boundary;
     - `test_submit_pipeline_job_omits_reserved_ip_ranges_when_unset` —
       backwards-compat: kwarg absent when `None`;
     - `test_run_pipeline_job_forwards_reserved_ip_ranges` — same for the
       synchronous variant + asserts `wait()` is called.
   
   ## Reproduce BEFORE/AFTER yourself (copy-paste)
   
   ```bash
   # --- one-time setup (skip if you already have a working Airflow dev env) ---
   git clone https://github.com/apache/airflow.git /tmp/repro-62733 && cd 
/tmp/repro-62733
   python -m venv .venv && source .venv/bin/activate
   pip install -e ./task-sdk -e ./airflow-core -e ./providers/google
   pip install pytest google-cloud-aiplatform google-auth-httplib2 \
       google-api-python-client google-cloud-secret-manager \
       gcloud-aio-bigquery gcloud-aio-storage gcloud-aio-auth time-machine
   
   # --- BEFORE (origin/main) — should FAIL ---
   git checkout origin/main
   git checkout feat/62733-reserved-ip-ranges-pipeline-job -- \
       
providers/google/tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py
   cd providers/google
   PYTHONPATH=../../devel-common/src:. python -m pytest \
       tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py \
       -k reserved_ip_ranges -xvs --rootdir=.
   # Expected: TypeError: PipelineJobHook.submit_pipeline_job() got an 
unexpected
   #           keyword argument 'reserved_ip_ranges'  → FAILED.
   
   # --- AFTER (this PR) — should PASS ---
   cd /tmp/repro-62733
   git checkout feat/62733-reserved-ip-ranges-pipeline-job
   cd providers/google
   PYTHONPATH=../../devel-common/src:. python -m pytest \
       tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py \
       -k reserved_ip_ranges -xvs --rootdir=.
   # Expected: 3 passed.
   ```
   
   The reviewer can also run the full hook test file to confirm no regression
   elsewhere: same command without `-k reserved_ip_ranges` (22 tests).
   
   ## What I ran locally
   
   - `pytest tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py -k 
reserved_ip_ranges -xvs` → **3/3 passed**
   - `pytest tests/unit/google/cloud/hooks/vertex_ai/test_pipeline_job.py -v` → 
**22/22 passed** (full file, no regression)
   - BEFORE on a stash of the implementation (tests only): the three new tests 
fail with `TypeError: ... unexpected keyword argument 'reserved_ip_ranges'` — 
which is the bug behavior reported in the issue.
   
   ## Edge cases tested
   
   | # | Scenario | Input | Expected | Verified by |
   |---|----------|-------|----------|-------------|
   | 1 | Default (unset) | `reserved_ip_ranges=None` | kwarg is **not** 
forwarded to `submit()` so older aiplatform releases keep working | 
`test_submit_pipeline_job_omits_reserved_ip_ranges_when_unset` |
   | 2 | Single range | `["range-1"]` | value lands on `submit()` unchanged; 
`wait()` is also called for the sync variant | 
`test_run_pipeline_job_forwards_reserved_ip_ranges` |
   | 3 | Multiple ranges | `["range-1", "range-2"]` | full list lands on 
`submit()` unchanged | `test_submit_pipeline_job_forwards_reserved_ip_ranges` |
   
   ## Risk / blast radius
   
   Additive only. No existing call site changes signature semantics:
   `reserved_ip_ranges` defaults to `None`, and the `submit()` call only
   forwards it when non-`None`. Users on the deferrable path (`deferrable=True`)
   are unaffected — `submit_pipeline_job` is what feeds the deferrable trigger
   and it already returns the same `PipelineJob`.
   
   ## Release note
   
   ```release-note
   Add ``reserved_ip_ranges`` parameter to ``RunPipelineJobOperator`` and the
   underlying Vertex AI ``PipelineJobHook.run_pipeline_job`` /
   ``submit_pipeline_job`` methods, allowing users to pin a Vertex AI Pipeline
   Job to specific reserved IP ranges of a peered VPC.
   ```
   
   ---
   
   *PR drafted with assistance from Claude Code (per
   `contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions`).
   The change was reviewed manually against `google-cloud-aiplatform`'s
   `PipelineJob.submit` signature and the precedent set by
   `providers/google/.../operators/vertex_ai/ray.py:189`. The reproducer
   block above was used during development; it is the same one a reviewer can
   paste verbatim. The contributor takes full responsibility for the patch.*
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to