evgeniy-b commented on code in PR #67711:
URL: https://github.com/apache/airflow/pull/67711#discussion_r3325171370


##########
providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -1117,6 +1117,38 @@ def is_job_dataflow_running(
         )
         return jobs_controller.is_job_running()
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def fetch_job_id_by_name(
+        self,
+        name: str,
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+    ) -> str | None:
+        """
+        Look up a single Dataflow job id by name prefix.
+
+        Returns the id when exactly one active job's name starts with ``name``;
+        ``None`` otherwise.
+        """
+        jobs_controller = _DataflowJobsController(
+            dataflow=self.get_conn(),
+            project_number=project_id,
+            name=name,
+            location=location,
+            poll_sleep=self.poll_sleep,
+            drain_pipeline=self.drain_pipeline,
+            num_retries=self.num_retries,
+            cancel_timeout=self.cancel_timeout,
+        )
+        try:
+            jobs = jobs_controller._get_current_jobs()
+        except Exception:
+            self.log.warning("Failed to look up Dataflow job id by name %r.", 
name, exc_info=True)
+            return None
+        if len(jobs) != 1:
+            return None

Review Comment:
   Let me explain a bit how I arrived here. On an airflow cluster I maintain I 
noticed python beam jobs running with `deferrable=False`, so I switched that 
flag to true to not waste worker resources. On the next day the jobs failed 
while transitioning to async triggers because their STDOUT didn't contain the 
job ID. In the sync mode a missing job ID doesn't prevent the task from 
succeeding:
   
   `_DataflowJobsController.wait_for_done` polls `self._refresh_jobs()`:
   
https://github.com/apache/airflow/blob/a7174b5bc6dcffabeb8ab72e2256c7cfe9035a13/providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py#L532-L542
   
   `_refresh_jobs` calls `self._get_current_jobs()`:
   
https://github.com/apache/airflow/blob/a7174b5bc6dcffabeb8ab72e2256c7cfe9035a13/providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py#L465-L471
   
   `_get_current_jobs` — with no `_job_id` — calls 
`self._fetch_jobs_by_prefix_name(self._job_name.lower())`:
   
https://github.com/apache/airflow/blob/a7174b5bc6dcffabeb8ab72e2256c7cfe9035a13/providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py#L328-L339
   
   `_fetch_jobs_by_prefix_name` calls `self._fetch_all_jobs()` and returns 
every prefix-matched job (archived + running, no terminal-state filter):
   
https://github.com/apache/airflow/blob/a7174b5bc6dcffabeb8ab72e2256c7cfe9035a13/providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py#L460-L463
   
   So today's sync path already silently picks up every prefix-matched job 
whenever the regex misses.
   
   With default `append_job_name=True` the job name will be unique and job ID 
will be retrieved.
   But you are right, it is a degradation: for jobs without unique names but 
printing out their IDs to console, the job ID will become missing.
   
   I guess an alternative could be to replicate the sync mode's behavior in the 
async path which currently fails without `job_id`. However it means that xcom 
and a link to the job will stay broken.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to