evgeniy-b commented on code in PR #67711:
URL: https://github.com/apache/airflow/pull/67711#discussion_r3325171370
##########
providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -1117,6 +1117,38 @@ def is_job_dataflow_running(
)
return jobs_controller.is_job_running()
+ @GoogleBaseHook.fallback_to_default_project_id
+ def fetch_job_id_by_name(
+ self,
+ name: str,
+ project_id: str,
+ location: str = DEFAULT_DATAFLOW_LOCATION,
+ ) -> str | None:
+ """
+ Look up a single Dataflow job id by name prefix.
+
+ Returns the id when exactly one active job's name starts with ``name``;
+ ``None`` otherwise.
+ """
+ jobs_controller = _DataflowJobsController(
+ dataflow=self.get_conn(),
+ project_number=project_id,
+ name=name,
+ location=location,
+ poll_sleep=self.poll_sleep,
+ drain_pipeline=self.drain_pipeline,
+ num_retries=self.num_retries,
+ cancel_timeout=self.cancel_timeout,
+ )
+ try:
+ jobs = jobs_controller._get_current_jobs()
+ except Exception:
+ self.log.warning("Failed to look up Dataflow job id by name %r.",
name, exc_info=True)
+ return None
+ if len(jobs) != 1:
+ return None
Review Comment:
Let me explain a bit how I arrived here. On an airflow cluster I maintain I
noticed python beam jobs running with `deferrable=False`, so I switched that
flag to true to not waste worker resources. On the next day the jobs failed
while transitioning to async triggers because their STDOUT didn't contain the
job ID. In the sync mode a missing job ID doesn't prevent the task from
succeeding:
`_DataflowJobsController.wait_for_done` polls `self._refresh_jobs()`:
https://github.com/apache/airflow/blob/a7174b5bc6dcffabeb8ab72e2256c7cfe9035a13/providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py#L532-L542
`_refresh_jobs` calls `self._get_current_jobs()`:
https://github.com/apache/airflow/blob/a7174b5bc6dcffabeb8ab72e2256c7cfe9035a13/providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py#L465-L471
`_get_current_jobs` — with no `_job_id` — calls
`self._fetch_jobs_by_prefix_name(self._job_name.lower())`:
https://github.com/apache/airflow/blob/a7174b5bc6dcffabeb8ab72e2256c7cfe9035a13/providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py#L328-L339
`_fetch_jobs_by_prefix_name` calls `self._fetch_all_jobs()` and returns
every prefix-matched job (archived + running, no terminal-state filter):
https://github.com/apache/airflow/blob/a7174b5bc6dcffabeb8ab72e2256c7cfe9035a13/providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py#L460-L463
So today's sync path already silently picks up every prefix-matched job
whenever the regex misses.
With default `append_job_name=True` the job name will be unique and job ID
will be retrieved.
But you are right, it is a degradation: for jobs without unique names but
printing out their IDs to console, the job ID will become missing.
I guess an alternative could be to replicate the sync mode's behavior in the
async path which currently fails without `job_id`. However it means that xcom
and a link to the job will stay broken.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]