potiuk commented on code in PR #24236:
URL: https://github.com/apache/airflow/pull/24236#discussion_r894749991


##########
dev/breeze/src/airflow_breeze/commands/testing_commands.py:
##########
@@ -112,6 +129,93 @@ def docker_compose_tests(
     sys.exit(return_code)
 
 
+class MonitoringThread(Thread):
+    """Thread class with a stop() method. The thread itself has to check
+    regularly for the stopped() condition."""
+
+    def __init__(self, title: str, file_name: str):
+        super().__init__(target=self.peek_percent_at_last_lines_of_file, 
daemon=True)
+        self._stop_event = Event()
+        self.title = title
+        self.file_name = file_name
+
+    def peek_percent_at_last_lines_of_file(self) -> None:
+        max_line_length = 400
+        matcher = re.compile(r"^.*\[([^\]]*)\]$")
+        while not self.stopped():
+            if os.path.exists(self.file_name):
+                try:
+                    with open(self.file_name, 'rb') as temp_f:
+                        temp_f.seek(-(max_line_length * 2), os.SEEK_END)
+                        tail = temp_f.read().decode()
+                    try:
+                        two_last_lines = tail.splitlines()[-2:]
+                        previous_no_ansi_line = escape_ansi(two_last_lines[0])
+                        m = matcher.match(previous_no_ansi_line)
+                        if m:
+                            get_console().print(f"[info]{self.title}:[/] 
{m.group(1).strip()}")
+                            print(f"\r{two_last_lines[0]}\r")
+                            print(f"\r{two_last_lines[1]}\r")
+                    except IndexError:
+                        pass
+                except OSError as e:
+                    if e.errno == errno.EINVAL:
+                        pass
+                    else:
+                        raise
+            sleep(5)
+
+    def stop(self):
+        self._stop_event.set()
+
+    def stopped(self):
+        return self._stop_event.is_set()
+
+
+def escape_ansi(line):
+    ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]')
+    return ansi_escape.sub('', line)
+
+
+def run_with_progress(
+    cmd: List[str],
+    env_variables: Dict[str, str],
+    test_type: str,
+    python: str,
+    backend: str,
+    version: str,
+    verbose: bool,
+    dry_run: bool,
+) -> RunCommandResult:
+    title = f"Running tests: {test_type}, Python: {python}, Backend: 
{backend}:{version}"
+    try:
+        with tempfile.NamedTemporaryFile(mode='w+t', delete=False) as f:
+            get_console().print(f"[info]Starting test = {title}[/]")
+            thread = MonitoringThread(title=title, file_name=f.name)
+            thread.start()
+            try:
+                result = run_command(
+                    cmd,
+                    verbose=verbose,
+                    dry_run=dry_run,
+                    env=env_variables,
+                    check=False,
+                    stdout=f,
+                    stderr=subprocess.STDOUT,

Review Comment:
   It is in captured in the same file. The stderr = `subprocess.STDOUT` means 
literally "send the output to the same place where stdout is sent". If you want 
to send it to stdout you would like to send it to `sys.stdout` instead. 
   
   See 
https://docs.python.org/3/library/subprocess.html#subprocess.CompletedProcess.stdout
   
   > If you ran the process with stderr=subprocess.STDOUT, stdout and stderr 
will be combined in this attribute, and 
[stderr](https://docs.python.org/3/library/subprocess.html#subprocess.CompletedProcess.stderr)
 will be None.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to