Bowrna commented on code in PR #24236:
URL: https://github.com/apache/airflow/pull/24236#discussion_r894113605


##########
dev/breeze/src/airflow_breeze/commands/testing_commands.py:
##########
@@ -112,6 +129,93 @@ def docker_compose_tests(
     sys.exit(return_code)
 
 
+class MonitoringThread(Thread):
+    """Thread class with a stop() method. The thread itself has to check
+    regularly for the stopped() condition."""
+
+    def __init__(self, title: str, file_name: str):
+        super().__init__(target=self.peek_percent_at_last_lines_of_file, 
daemon=True)
+        self._stop_event = Event()
+        self.title = title
+        self.file_name = file_name
+
+    def peek_percent_at_last_lines_of_file(self) -> None:
+        max_line_length = 400
+        matcher = re.compile(r"^.*\[([^\]]*)\]$")
+        while not self.stopped():
+            if os.path.exists(self.file_name):
+                try:
+                    with open(self.file_name, 'rb') as temp_f:
+                        temp_f.seek(-(max_line_length * 2), os.SEEK_END)
+                        tail = temp_f.read().decode()
+                    try:
+                        two_last_lines = tail.splitlines()[-2:]
+                        previous_no_ansi_line = escape_ansi(two_last_lines[0])
+                        m = matcher.match(previous_no_ansi_line)
+                        if m:
+                            get_console().print(f"[info]{self.title}:[/] 
{m.group(1).strip()}")
+                            print(f"\r{two_last_lines[0]}\r")
+                            print(f"\r{two_last_lines[1]}\r")
+                    except IndexError:
+                        pass
+                except OSError as e:
+                    if e.errno == errno.EINVAL:
+                        pass
+                    else:
+                        raise
+            sleep(5)
+
+    def stop(self):
+        self._stop_event.set()
+
+    def stopped(self):
+        return self._stop_event.is_set()
+
+
+def escape_ansi(line):
+    ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]')
+    return ansi_escape.sub('', line)
+
+
+def run_with_progress(
+    cmd: List[str],
+    env_variables: Dict[str, str],
+    test_type: str,
+    python: str,
+    backend: str,
+    version: str,
+    verbose: bool,
+    dry_run: bool,
+) -> RunCommandResult:
+    title = f"Running tests: {test_type}, Python: {python}, Backend: 
{backend}:{version}"
+    try:
+        with tempfile.NamedTemporaryFile(mode='w+t', delete=False) as f:
+            get_console().print(f"[info]Starting test = {title}[/]")
+            thread = MonitoringThread(title=title, file_name=f.name)
+            thread.start()
+            try:
+                result = run_command(
+                    cmd,
+                    verbose=verbose,
+                    dry_run=dry_run,
+                    env=env_variables,
+                    check=False,
+                    stdout=f,
+                    stderr=subprocess.STDOUT,

Review Comment:
   @potiuk the `stderr` part is sent to STDOUT and not captured in the file 
right? So if there is an error in the process, we print directly rather than 
capturing from the file. Am I right in understanding that part?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to