GayathriSrividya opened a new pull request, #68720:
URL: https://github.com/apache/airflow/pull/68720

   Closes #68279
   
   ## Problem
   
   A `BeamRunPythonPipelineOperator` (or Java variant) with `deferrable=True` 
and `runner="DataflowRunner"` raises:
   
   ```
   airflow.exceptions.AirflowException: 400 Request must contain a job and 
project id.
   ```
   
   whenever the Beam launcher subprocess stdout does not contain the `Created 
job with id: [...]` line. This happens routinely when the pipeline does not 
configure INFO-level logging, since the Beam SDK emits that line at INFO while 
Python's root logger defaults to WARNING.
   
   The **synchronous** path (`deferrable=False`) already handles this: 
`DataflowHook.wait_for_done()` falls back to resolving the job by name prefix 
when `job_id=None`. The **deferrable** path had no such fallback — it passed 
`job_id=None` directly into the trigger, which the Dataflow API immediately 
rejected.
   
   ## Fix
   
   Before building the trigger in `execute_on_dataflow()`, if 
`self.dataflow_job_id` is still `None` after the launcher finishes, call a new 
`DataflowHook.get_job_id_by_name()` helper that resolves the ID via the 
Dataflow REST API by name prefix. This mirrors the existing synchronous 
fallback so both paths behave consistently.
   
   The fix applies to both `BeamRunPythonPipelineOperator` and 
`BeamRunJavaPipelineOperator`.
   
   ## Changes
   
   - `providers/google/.../hooks/dataflow.py`: add 
`DataflowHook.get_job_id_by_name()` — looks up the most recently submitted job 
whose name starts with the given prefix and returns its ID.
   - `providers/apache/beam/.../operators/beam.py`: in `execute_on_dataflow()` 
for both Python and Java operators, call the new helper when `dataflow_job_id 
is None` before deferring.
   - `providers/apache/beam/tests/.../test_beam.py`: add 
`test_exec_dataflow_runner_defers_with_resolved_job_id_when_stdout_missing` for 
both operators, asserting `get_job_id_by_name` is called and the trigger 
carries the resolved ID.
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [X] Yes — GitHub Copilot (Claude Sonnet 4.6)
   
   Generated-by: GitHub Copilot (Claude Sonnet 4.6) following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to