MaksYermak commented on code in PR #67711:
URL: https://github.com/apache/airflow/pull/67711#discussion_r3372609757
##########
providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -1117,6 +1117,38 @@ def is_job_dataflow_running(
)
return jobs_controller.is_job_running()
+ @GoogleBaseHook.fallback_to_default_project_id
+ def fetch_job_id_by_name(
+ self,
+ name: str,
+ project_id: str,
+ location: str = DEFAULT_DATAFLOW_LOCATION,
+ ) -> str | None:
+ """
+ Look up a single Dataflow job id by name prefix.
+
+ Returns the id when exactly one active job's name starts with ``name``;
+ ``None`` otherwise.
+ """
+ jobs_controller = _DataflowJobsController(
+ dataflow=self.get_conn(),
+ project_number=project_id,
+ name=name,
+ location=location,
+ poll_sleep=self.poll_sleep,
+ drain_pipeline=self.drain_pipeline,
+ num_retries=self.num_retries,
+ cancel_timeout=self.cancel_timeout,
+ )
+ try:
+ jobs = jobs_controller._get_current_jobs()
+ except Exception:
+ self.log.warning("Failed to look up Dataflow job id by name %r.",
name, exc_info=True)
+ return None
+ if len(jobs) != 1:
+ return None
Review Comment:
> It works only when STDOUT contains the job ID. In my case the job's output
didn't include it. So jobId=None. The divergence is in how deferrable/worker
mode treat missing job IDs. Trigger fails immediately while the sync worker
path lists all jobs by the name and monitors statuses of all matching jobs (I
linked exact code lines in the previous comment).
It's a real bug: all tasks in deferrable mode whose job outputs didn't match
Job ID regex will fail.
Hmm I am still do not understand how it can be possible, in your case, to
start deferrable mode without `JobID`. Because in the current code we have this
[logic](https://github.com/apache/airflow/blob/main/providers/apache/beam/src/airflow/providers/apache/beam/hooks/beam.py#L149-L150)
for `process_fd` and this
[logic](https://github.com/apache/airflow/blob/main/providers/apache/beam/src/airflow/providers/apache/beam/hooks/beam.py#L190-L192)
for `run_beam_command`. As you can see, in the code we have `while True`
[loop](https://github.com/apache/airflow/blob/main/providers/apache/beam/src/airflow/providers/apache/beam/hooks/beam.py#L182-L195)
which reads logs from Beam run process till the Job finished. And only in
case, when `JobID` presents the code leaves this loop and starts waiting
process using Dataflow API via `deferrable` or ` non-deferrable` modes.
Otherwise, if you do not have `JobID` then the code runs your Job in
`non-deferrable` mode till the end and never use Datafl
ow API for checkin status.
I see only one scenario when the fail in `deferrable` mode can be possible
when without `JobID` this infinite loop goes to the end and successfully
finished the Job. And after that Operator tries to start `deferrable` mode and
failed because the `JobID` is empty. And in `non-deferrable` mode everything is
fine because for
[wait_for_done](https://github.com/apache/airflow/blob/main/providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py#L1249-L1257),
the `JobID` can be `None`. I think this can be your's scenario, but I need
equivalent of your's Pipeline script for reproduction.
Could you please share Apache Beam provider version which you use and the
code for reproduction this issue?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]