MaksYermak commented on code in PR #67711:
URL: https://github.com/apache/airflow/pull/67711#discussion_r3324686077


##########
providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -1117,6 +1117,38 @@ def is_job_dataflow_running(
         )
         return jobs_controller.is_job_running()
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def fetch_job_id_by_name(
+        self,
+        name: str,
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+    ) -> str | None:
+        """
+        Look up a single Dataflow job id by name prefix.
+
+        Returns the id when exactly one active job's name starts with ``name``;
+        ``None`` otherwise.
+        """
+        jobs_controller = _DataflowJobsController(
+            dataflow=self.get_conn(),
+            project_number=project_id,
+            name=name,
+            location=location,
+            poll_sleep=self.poll_sleep,
+            drain_pipeline=self.drain_pipeline,
+            num_retries=self.num_retries,
+            cancel_timeout=self.cancel_timeout,
+        )
+        try:
+            jobs = jobs_controller._get_current_jobs()
+        except Exception:
+            self.log.warning("Failed to look up Dataflow job id by name %r.", 
name, exc_info=True)
+            return None
+        if len(jobs) != 1:
+            return None

Review Comment:
   @evgeniy-b as I understand in case when users run in parallel 2 or more Jobs 
with the same name or on Dataflow the Job with this name already present than 
this code returns `None` as JobID value, please correct me if I am wrong?
   
   In the current logic with callbacks the code parse Apache Beam logs for 
availability of JobID and when getting it then starts the waiting process in 
deferrable or non-deferable mode. It means that we always have unique Job ID. 
   
   This new logic looks for me as a breaking change because returns `None` as 
JobID in case when in Dataflow the users have 2 or more Jobs with the same 
name. It is possible scenario for the most of our users because in Dataflow is 
impossible to remove finished Jobs the user can only archived it. And our 
`_fetch_all_jobs` method does not sort Jobs by finished or running and returns 
all Jobs with the same name.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to