evgeniy-b commented on code in PR #67711:
URL: https://github.com/apache/airflow/pull/67711#discussion_r3372309821
##########
providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -1117,6 +1117,38 @@ def is_job_dataflow_running(
)
return jobs_controller.is_job_running()
+ @GoogleBaseHook.fallback_to_default_project_id
+ def fetch_job_id_by_name(
+ self,
+ name: str,
+ project_id: str,
+ location: str = DEFAULT_DATAFLOW_LOCATION,
+ ) -> str | None:
+ """
+ Look up a single Dataflow job id by name prefix.
+
+ Returns the id when exactly one active job's name starts with ``name``;
+ ``None`` otherwise.
+ """
+ jobs_controller = _DataflowJobsController(
+ dataflow=self.get_conn(),
+ project_number=project_id,
+ name=name,
+ location=location,
+ poll_sleep=self.poll_sleep,
+ drain_pipeline=self.drain_pipeline,
+ num_retries=self.num_retries,
+ cancel_timeout=self.cancel_timeout,
+ )
+ try:
+ jobs = jobs_controller._get_current_jobs()
+ except Exception:
+ self.log.warning("Failed to look up Dataflow job id by name %r.",
name, exc_info=True)
+ return None
+ if len(jobs) != 1:
+ return None
Review Comment:
I understand that job names are not unique and totally agree that using
names for status checks is awkward.
> This solution with callbacks was introduced in the beginner of life for
Apache Beam operators and removing it completely is breaking change for users.
Fair. I'm not proposing to remove it anymore because it would be a
regression.
> This code use callback for getting Job ID from STDOUT for Dataflow runner
before stating to wait in non-deferrable or deferrable modes.
It works only when STDOUT contains the job ID. In my case the job's output
didn't include it. So `jobId=None`. The divergence is in how deferrable/worker
mode treat missing job IDs. Trigger fails immediately while the sync worker
path lists all jobs by the name and monitors statuses of all matching jobs (I
linked exact code lines in the previous comment).
It's a real bug: all tasks in deferrable mode whose job outputs didn't match
Job ID regex will fail.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]