TobKed commented on a change in pull request #12814:
URL: https://github.com/apache/airflow/pull/12814#discussion_r563147783



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -50,6 +47,35 @@
 T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
 
 
+def process_line_and_extract_dataflow_job_id_callback(
+    on_new_job_id_callback: Optional[Callable[[str], None]]
+) -> Callable[[str], None]:
+    """
+    Returns callback which triggers function passed as 
`on_new_job_id_callback` when Dataflow job_id is found.
+    To be used for `process_line_callback` in
+    :py:class:`~airflow.providers.apache.beam.hooks.beam.BeamCommandRunner`
+
+    :param on_new_job_id_callback: Callback called when the job ID is known
+    :type on_new_job_id_callback: callback
+    """
+
+    def _process_line_and_extract_job_id(
+        line: str,
+        # on_new_job_id_callback: Optional[Callable[[str], None]]
+    ) -> None:
+        # Job id info: https://goo.gl/SE29y9.
+        matched_job = JOB_ID_PATTERN.search(line)
+        if matched_job:
+            job_id = matched_job.group("job_id_java") or 
matched_job.group("job_id_python")

Review comment:
       It is different matching group because stdout for Java and Python 
commands shows job id in different way:
   ```
   JOB_ID_PATTERN = re.compile(
       r"Submitted job: (?P<job_id_java>.*)|Created job with id: 
\[(?P<job_id_python>.*)\]"
   )
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to