Bowrna commented on code in PR #24236:
URL: https://github.com/apache/airflow/pull/24236#discussion_r894117097


##########
dev/breeze/src/airflow_breeze/commands/testing_commands.py:
##########
@@ -112,6 +129,93 @@ def docker_compose_tests(
     sys.exit(return_code)
 
 
+class MonitoringThread(Thread):
+    """Thread class with a stop() method. The thread itself has to check
+    regularly for the stopped() condition."""
+
+    def __init__(self, title: str, file_name: str):
+        super().__init__(target=self.peek_percent_at_last_lines_of_file, 
daemon=True)
+        self._stop_event = Event()
+        self.title = title
+        self.file_name = file_name
+
+    def peek_percent_at_last_lines_of_file(self) -> None:
+        max_line_length = 400
+        matcher = re.compile(r"^.*\[([^\]]*)\]$")
+        while not self.stopped():
+            if os.path.exists(self.file_name):
+                try:
+                    with open(self.file_name, 'rb') as temp_f:
+                        temp_f.seek(-(max_line_length * 2), os.SEEK_END)
+                        tail = temp_f.read().decode()
+                    try:
+                        two_last_lines = tail.splitlines()[-2:]
+                        previous_no_ansi_line = escape_ansi(two_last_lines[0])
+                        m = matcher.match(previous_no_ansi_line)
+                        if m:
+                            get_console().print(f"[info]{self.title}:[/] 
{m.group(1).strip()}")
+                            print(f"\r{two_last_lines[0]}\r")
+                            print(f"\r{two_last_lines[1]}\r")
+                    except IndexError:
+                        pass
+                except OSError as e:
+                    if e.errno == errno.EINVAL:
+                        pass
+                    else:
+                        raise
+            sleep(5)
+
+    def stop(self):
+        self._stop_event.set()
+
+    def stopped(self):
+        return self._stop_event.is_set()
+
+
+def escape_ansi(line):
+    ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]')
+    return ansi_escape.sub('', line)
+
+
+def run_with_progress(
+    cmd: List[str],
+    env_variables: Dict[str, str],
+    test_type: str,
+    python: str,
+    backend: str,
+    version: str,
+    verbose: bool,
+    dry_run: bool,
+) -> RunCommandResult:
+    title = f"Running tests: {test_type}, Python: {python}, Backend: 
{backend}:{version}"
+    try:
+        with tempfile.NamedTemporaryFile(mode='w+t', delete=False) as f:
+            get_console().print(f"[info]Starting test = {title}[/]")
+            thread = MonitoringThread(title=title, file_name=f.name)
+            thread.start()
+            try:
+                result = run_command(
+                    cmd,
+                    verbose=verbose,
+                    dry_run=dry_run,
+                    env=env_variables,
+                    check=False,
+                    stdout=f,
+                    stderr=subprocess.STDOUT,
+                )
+            finally:
+                thread.stop()
+                thread.join()
+        with ci_group(
+            f"Result of {title}", message_type="[success]" if 
result.returncode == 0 else "[error]"
+        ):
+            with open(f.name) as f:
+                shutil.copyfileobj(f, sys.stdout)

Review Comment:
   @potiuk Why do we need to copy file to stdout rather than simply printing 
the file content out?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to