evgeniy-b commented on code in PR #67711:
URL: https://github.com/apache/airflow/pull/67711#discussion_r3373198542
##########
providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -1117,6 +1117,38 @@ def is_job_dataflow_running(
)
return jobs_controller.is_job_running()
+ @GoogleBaseHook.fallback_to_default_project_id
+ def fetch_job_id_by_name(
+ self,
+ name: str,
+ project_id: str,
+ location: str = DEFAULT_DATAFLOW_LOCATION,
+ ) -> str | None:
+ """
+ Look up a single Dataflow job id by name prefix.
+
+ Returns the id when exactly one active job's name starts with ``name``;
+ ``None`` otherwise.
+ """
+ jobs_controller = _DataflowJobsController(
+ dataflow=self.get_conn(),
+ project_number=project_id,
+ name=name,
+ location=location,
+ poll_sleep=self.poll_sleep,
+ drain_pipeline=self.drain_pipeline,
+ num_retries=self.num_retries,
+ cancel_timeout=self.cancel_timeout,
+ )
+ try:
+ jobs = jobs_controller._get_current_jobs()
+ except Exception:
+ self.log.warning("Failed to look up Dataflow job id by name %r.",
name, exc_info=True)
+ return None
+ if len(jobs) != 1:
+ return None
Review Comment:
> I see only one scenario when the fail in deferrable mode can be possible
when without JobID this infinite loop goes to the end and successfully finished
the Job. And after that Operator tries to start deferrable mode and failed
because the JobID is empty.
Right. I think this is exactly what did happen. It matches the logs: job
starts at 03:19, completes at 04:49 and defers. The error is raised only at
05:30 because the task runs in a pool with limited concurrency.
```
2026-05-28T03:19:52.878371238Z 2 INFO Beam version: 2.71.0
2026-05-28T03:19:52.878596782Z 2 INFO Running command:
python3 /tmp/xxx.py --runner=DataflowRunner --job_name=xxx-e8245706
[email protected] --project=xxx
--region=europe-west1 --labels=airflow-version=v3-1-7-composer ...
2026-05-28T03:19:52.879618883Z 2 INFO Start waiting for
Apache Beam process to complete.
2026-05-28T03:19:54.525101900Z 2 WARNING WARNING:root:crcmod
package not found. This package is required if python-snappy or google-crc32c
are not installed. To ensure crcmod is installed, install the tfrecord extra:
pip install apache-beam[tfrecord]
2026-05-28T04:15:13.948502540Z 2 WARNING
WARNING:google_auth_httplib2:httplib2 transport does not support per-request
timeout. Set the timeout when constructing the httplib2.Http instance.
2026-05-28T04:15:13.949025630Z 2 WARNING
WARNING:google_auth_httplib2:httplib2 transport does not support per-request
timeout. Set the timeout when constructing the httplib2.Http instance.
2026-05-28T04:49:18.090466022Z 2 INFO Process exited with
return code: 0
2026-05-28T04:49:18.126811027Z 2 INFO Pausing task as
DEFERRED. [dag_id=yyy] [task_id=xxx]
[run_id=scheduled__2026-05-26T00:00:00+00:00]
2026-05-28T04:49:18.309519529Z 2 INFO Task finished
[task_instance_id=019e6c93-6986-78e0-8590-72a67d0c1bf9] [exit_code=0]
[duration=5392.726224065] [final_state=deferred]
2026-05-28T05:30:02.155142307Z 2 INFO Getting connection
using `google.auth.default()` since no explicit credentials are provided.
2026-05-28T05:30:02.159145593Z 2 INFO Secrets backends loaded
for worker [count=2] [backend_classes=['CloudSecretManagerBackend',
'EnvironmentVariablesBackend']]
2026-05-28T05:30:04.757926940Z 2 INFO DAG bundles loaded:
dags-folder
2026-05-28T05:30:04.758570909Z 2 INFO Filling up the DagBag
from /home/airflow/gcs/dags/recommendations/yyy/yyy.py
2026-05-28T05:30:24.896252393Z 2 ERROR Task failed with
exception
Traceback (most recent call last):
File
"/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/execution_time/task_runner.py",
line 1068, in run
result = _execute_task(context=context, ti=ti, log=log)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/execution_time/task_runner.py",
line 1472, in _execute_task
result = ctx.run(execute, context=context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/bases/operator.py",
line 1633, in resume_execution
return execute_callable(context, **next_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/opt/python3.11/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py",
line 300, in execute_complete
raise AirflowException(event["message"])
airflow.exceptions.AirflowException: 400 Request must contain a job and
project id.
```
I can prepare an example to reproduce. Should I open it as a new bug ticket?
I can also create a PR right away if we agree on the fix approach. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]