MaksYermak commented on code in PR #67711:
URL: https://github.com/apache/airflow/pull/67711#discussion_r3372609757


##########
providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -1117,6 +1117,38 @@ def is_job_dataflow_running(
         )
         return jobs_controller.is_job_running()
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def fetch_job_id_by_name(
+        self,
+        name: str,
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+    ) -> str | None:
+        """
+        Look up a single Dataflow job id by name prefix.
+
+        Returns the id when exactly one active job's name starts with ``name``;
+        ``None`` otherwise.
+        """
+        jobs_controller = _DataflowJobsController(
+            dataflow=self.get_conn(),
+            project_number=project_id,
+            name=name,
+            location=location,
+            poll_sleep=self.poll_sleep,
+            drain_pipeline=self.drain_pipeline,
+            num_retries=self.num_retries,
+            cancel_timeout=self.cancel_timeout,
+        )
+        try:
+            jobs = jobs_controller._get_current_jobs()
+        except Exception:
+            self.log.warning("Failed to look up Dataflow job id by name %r.", 
name, exc_info=True)
+            return None
+        if len(jobs) != 1:
+            return None

Review Comment:
   > It works only when STDOUT contains the job ID. In my case the job's output 
didn't include it. So jobId=None. The divergence is in how deferrable/worker 
mode treat missing job IDs. Trigger fails immediately while the sync worker 
path lists all jobs by the name and monitors statuses of all matching jobs (I 
linked exact code lines in the previous comment).
   It's a real bug: all tasks in deferrable mode whose job outputs didn't match 
Job ID regex will fail.
   
   Hmm I am still do not understand how it can be possible, in your case, to 
start deferrable mode without `JobID`. Because in the current code we have this 
[logic](https://github.com/apache/airflow/blob/main/providers/apache/beam/src/airflow/providers/apache/beam/hooks/beam.py#L149-L150)
 for `process_fd` and this 
[logic](https://github.com/apache/airflow/blob/main/providers/apache/beam/src/airflow/providers/apache/beam/hooks/beam.py#L190-L192)
 for `run_beam_command`. As you can see, in the code we have `while True` 
[loop](https://github.com/apache/airflow/blob/main/providers/apache/beam/src/airflow/providers/apache/beam/hooks/beam.py#L182-L195)
 which reads logs from Beam run process till the Job finished. And only in 
case, when `JobID` presents the code leaves this loop and starts waiting 
process using Dataflow API via `deferrable` or ` non-deferrable` modes. 
Otherwise, if you do not have `JobID` then the code runs your Job in 
`non-deferrable`  mode till the end and never use Datafl
 ow API for checkin status.
   
   I see only one scenario when the fail in `deferrable` mode can be possible 
when without `JobID` this infinite loop goes to the end and successfully 
finished the Job. And after that Operator tries to start `deferrable` mode and 
failed because the `JobID` is empty. And in `non-deferrable` mode everything is 
fine because for 
[wait_for_done](https://github.com/apache/airflow/blob/main/providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py#L1249-L1257),
 the `JobID` can be `None`. I think this can be your's scenario, but I need 
equivalent of your's Pipeline script for reproduction.
   
   Could you please share Apache Beam provider version which you use and the 
code for reproduction this issue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to