evgeniy-b commented on code in PR #67711:
URL: https://github.com/apache/airflow/pull/67711#discussion_r3339090804
##########
providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -1117,6 +1117,38 @@ def is_job_dataflow_running(
)
return jobs_controller.is_job_running()
+ @GoogleBaseHook.fallback_to_default_project_id
+ def fetch_job_id_by_name(
+ self,
+ name: str,
+ project_id: str,
+ location: str = DEFAULT_DATAFLOW_LOCATION,
+ ) -> str | None:
+ """
+ Look up a single Dataflow job id by name prefix.
+
+ Returns the id when exactly one active job's name starts with ``name``;
+ ``None`` otherwise.
+ """
+ jobs_controller = _DataflowJobsController(
+ dataflow=self.get_conn(),
+ project_number=project_id,
+ name=name,
+ location=location,
+ poll_sleep=self.poll_sleep,
+ drain_pipeline=self.drain_pipeline,
+ num_retries=self.num_retries,
+ cancel_timeout=self.cancel_timeout,
+ )
+ try:
+ jobs = jobs_controller._get_current_jobs()
+ except Exception:
+ self.log.warning("Failed to look up Dataflow job id by name %r.",
name, exc_info=True)
+ return None
+ if len(jobs) != 1:
+ return None
Review Comment:
I think job ID in output detection should be reverted. While it is awkward
in principle, it is the only way (?) to reliably get ID when job names are not
unique. Then name-based ID detection can be used as a fallback but only when
`append_job_name=True`. And if the trigger receives empty job ID it should
fallback to polling status of all jobs matching the name (and not in terminal
status).
@MaksYermak what's your take on this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]