evgeniy-b commented on code in PR #67711:
URL: https://github.com/apache/airflow/pull/67711#discussion_r3380169593


##########
providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -1117,6 +1117,38 @@ def is_job_dataflow_running(
         )
         return jobs_controller.is_job_running()
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def fetch_job_id_by_name(
+        self,
+        name: str,
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+    ) -> str | None:
+        """
+        Look up a single Dataflow job id by name prefix.
+
+        Returns the id when exactly one active job's name starts with ``name``;
+        ``None`` otherwise.
+        """
+        jobs_controller = _DataflowJobsController(
+            dataflow=self.get_conn(),
+            project_number=project_id,
+            name=name,
+            location=location,
+            poll_sleep=self.poll_sleep,
+            drain_pipeline=self.drain_pipeline,
+            num_retries=self.num_retries,
+            cancel_timeout=self.cancel_timeout,
+        )
+        try:
+            jobs = jobs_controller._get_current_jobs()
+        except Exception:
+            self.log.warning("Failed to look up Dataflow job id by name %r.", 
name, exc_info=True)
+            return None
+        if len(jobs) != 1:
+            return None

Review Comment:
   Makes sense! Here is the ticket 
https://github.com/apache/airflow/issues/68279



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to