GayathriSrividya opened a new pull request, #68720: URL: https://github.com/apache/airflow/pull/68720
Closes #68279 ## Problem A `BeamRunPythonPipelineOperator` (or Java variant) with `deferrable=True` and `runner="DataflowRunner"` raises: ``` airflow.exceptions.AirflowException: 400 Request must contain a job and project id. ``` whenever the Beam launcher subprocess stdout does not contain the `Created job with id: [...]` line. This happens routinely when the pipeline does not configure INFO-level logging, since the Beam SDK emits that line at INFO while Python's root logger defaults to WARNING. The **synchronous** path (`deferrable=False`) already handles this: `DataflowHook.wait_for_done()` falls back to resolving the job by name prefix when `job_id=None`. The **deferrable** path had no such fallback — it passed `job_id=None` directly into the trigger, which the Dataflow API immediately rejected. ## Fix Before building the trigger in `execute_on_dataflow()`, if `self.dataflow_job_id` is still `None` after the launcher finishes, call a new `DataflowHook.get_job_id_by_name()` helper that resolves the ID via the Dataflow REST API by name prefix. This mirrors the existing synchronous fallback so both paths behave consistently. The fix applies to both `BeamRunPythonPipelineOperator` and `BeamRunJavaPipelineOperator`. ## Changes - `providers/google/.../hooks/dataflow.py`: add `DataflowHook.get_job_id_by_name()` — looks up the most recently submitted job whose name starts with the given prefix and returns its ID. - `providers/apache/beam/.../operators/beam.py`: in `execute_on_dataflow()` for both Python and Java operators, call the new helper when `dataflow_job_id is None` before deferring. - `providers/apache/beam/tests/.../test_beam.py`: add `test_exec_dataflow_runner_defers_with_resolved_job_id_when_stdout_missing` for both operators, asserting `get_job_id_by_name` is called and the trigger carries the resolved ID. --- ##### Was generative AI tooling used to co-author this PR? - [X] Yes — GitHub Copilot (Claude Sonnet 4.6) Generated-by: GitHub Copilot (Claude Sonnet 4.6) following [the guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
