josh-fell commented on code in PR #29503:
URL: https://github.com/apache/airflow/pull/29503#discussion_r1108895095


##########
airflow/providers/apache/beam/hooks/beam.py:
##########
@@ -81,81 +81,85 @@ def beam_options_to_args(options: dict) -> list[str]:
     return args
 
 
-class BeamCommandRunner(LoggingMixin):
+def process_fd(
+    proc,
+    fd,
+    log: logging.Logger,
+    process_line_callback: Callable[[str], None] | None = None,
+):
+    """
+    Prints output to logs.
+
+    :param proc: subprocess.
+    :param fd: File descriptor.
+    :param process_line_callback: Optional callback which can be used to 
process
+        stdout and stderr to detect job id.
+    :param log: logger.
+    """
+    if fd not in (proc.stdout, proc.stderr):
+        raise Exception("No data in stderr or in stdout.")
+
+    fd_to_log = {proc.stderr: log.warning, proc.stdout: log.info}
+    func_log = fd_to_log[fd]
+
+    while True:
+        line = fd.readline().decode()
+        if not line:
+            return
+        if process_line_callback:
+            process_line_callback(line)
+        func_log(line.rstrip("\n"))
+
+
+def run_beam_command(
+    cmd: list[str],
+    log: logging.Logger,
+    process_line_callback: Callable[[str], None] | None = None,
+    working_directory: str | None = None,
+) -> None:
     """
     Class responsible for running pipeline command in subprocess

Review Comment:
   ```suggestion
       Function responsible for running pipeline command in subprocess.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to