evgeniy-b commented on code in PR #67711:
URL: https://github.com/apache/airflow/pull/67711#discussion_r3373198542


##########
providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -1117,6 +1117,38 @@ def is_job_dataflow_running(
         )
         return jobs_controller.is_job_running()
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def fetch_job_id_by_name(
+        self,
+        name: str,
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+    ) -> str | None:
+        """
+        Look up a single Dataflow job id by name prefix.
+
+        Returns the id when exactly one active job's name starts with ``name``;
+        ``None`` otherwise.
+        """
+        jobs_controller = _DataflowJobsController(
+            dataflow=self.get_conn(),
+            project_number=project_id,
+            name=name,
+            location=location,
+            poll_sleep=self.poll_sleep,
+            drain_pipeline=self.drain_pipeline,
+            num_retries=self.num_retries,
+            cancel_timeout=self.cancel_timeout,
+        )
+        try:
+            jobs = jobs_controller._get_current_jobs()
+        except Exception:
+            self.log.warning("Failed to look up Dataflow job id by name %r.", 
name, exc_info=True)
+            return None
+        if len(jobs) != 1:
+            return None

Review Comment:
   > I see only one scenario when the fail in deferrable mode can be possible 
when without JobID this infinite loop goes to the end and successfully finished 
the Job. And after that Operator tries to start deferrable mode and failed 
because the JobID is empty. 
   
   Right. I think this is exactly what did happen. It matches the logs: job 
starts at 03:19, completes at 04:49 and defers. The error is raised only at 
05:30 because the task runs in a pool with limited concurrency.
   
   ```
   2026-05-28T03:19:52.878371238Z       2       INFO    Beam version: 2.71.0
   2026-05-28T03:19:52.878596782Z       2       INFO    Running command: 
python3 /tmp/xxx.py --runner=DataflowRunner --job_name=xxx-e8245706 
[email protected] --project=xxx 
--region=europe-west1 --labels=airflow-version=v3-1-7-composer ...
   2026-05-28T03:19:52.879618883Z       2       INFO    Start waiting for 
Apache Beam process to complete.
   2026-05-28T03:19:54.525101900Z       2       WARNING WARNING:root:crcmod 
package not found. This package is required if python-snappy or google-crc32c 
are not installed. To ensure crcmod is installed, install the tfrecord extra: 
pip install apache-beam[tfrecord]
   2026-05-28T04:15:13.948502540Z       2       WARNING 
WARNING:google_auth_httplib2:httplib2 transport does not support per-request 
timeout. Set the timeout when constructing the httplib2.Http instance.
   2026-05-28T04:15:13.949025630Z       2       WARNING 
WARNING:google_auth_httplib2:httplib2 transport does not support per-request 
timeout. Set the timeout when constructing the httplib2.Http instance.
   2026-05-28T04:49:18.090466022Z       2       INFO    Process exited with 
return code: 0
   2026-05-28T04:49:18.126811027Z       2       INFO    Pausing task as 
DEFERRED.  [dag_id=yyy] [task_id=xxx] 
[run_id=scheduled__2026-05-26T00:00:00+00:00]
   2026-05-28T04:49:18.309519529Z       2       INFO    Task finished 
[task_instance_id=019e6c93-6986-78e0-8590-72a67d0c1bf9] [exit_code=0] 
[duration=5392.726224065] [final_state=deferred]
   2026-05-28T05:30:02.155142307Z       2       INFO    Getting connection 
using `google.auth.default()` since no explicit credentials are provided.
   2026-05-28T05:30:02.159145593Z       2       INFO    Secrets backends loaded 
for worker [count=2] [backend_classes=['CloudSecretManagerBackend', 
'EnvironmentVariablesBackend']]
   2026-05-28T05:30:04.757926940Z       2       INFO    DAG bundles loaded: 
dags-folder
   2026-05-28T05:30:04.758570909Z       2       INFO    Filling up the DagBag 
from /home/airflow/gcs/dags/recommendations/yyy/yyy.py
   2026-05-28T05:30:24.896252393Z       2       ERROR   Task failed with 
exception
   Traceback (most recent call last):
     File 
"/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/execution_time/task_runner.py",
 line 1068, in run
       result = _execute_task(context=context, ti=ti, log=log)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/execution_time/task_runner.py",
 line 1472, in _execute_task
       result = ctx.run(execute, context=context)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/airflow/sdk/bases/operator.py", 
line 1633, in resume_execution
       return execute_callable(context, **next_kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/airflow/providers/apache/beam/operators/beam.py",
 line 300, in execute_complete
       raise AirflowException(event["message"])
   airflow.exceptions.AirflowException: 400 Request must contain a job and 
project id.
   ```
   
   I can prepare an example to reproduce. Should I open it as a new bug ticket?
   I can also create a PR right away if we agree on the fix approach. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to