potiuk commented on code in PR #24236:
URL: https://github.com/apache/airflow/pull/24236#discussion_r894747966
##########
dev/breeze/src/airflow_breeze/commands/testing_commands.py:
##########
@@ -112,6 +129,93 @@ def docker_compose_tests(
sys.exit(return_code)
+class MonitoringThread(Thread):
+ """Thread class with a stop() method. The thread itself has to check
+ regularly for the stopped() condition."""
+
+ def __init__(self, title: str, file_name: str):
+ super().__init__(target=self.peek_percent_at_last_lines_of_file,
daemon=True)
+ self._stop_event = Event()
+ self.title = title
+ self.file_name = file_name
+
+ def peek_percent_at_last_lines_of_file(self) -> None:
+ max_line_length = 400
+ matcher = re.compile(r"^.*\[([^\]]*)\]$")
+ while not self.stopped():
+ if os.path.exists(self.file_name):
+ try:
+ with open(self.file_name, 'rb') as temp_f:
+ temp_f.seek(-(max_line_length * 2), os.SEEK_END)
+ tail = temp_f.read().decode()
+ try:
+ two_last_lines = tail.splitlines()[-2:]
+ previous_no_ansi_line = escape_ansi(two_last_lines[0])
+ m = matcher.match(previous_no_ansi_line)
+ if m:
+ get_console().print(f"[info]{self.title}:[/]
{m.group(1).strip()}")
+ print(f"\r{two_last_lines[0]}\r")
+ print(f"\r{two_last_lines[1]}\r")
+ except IndexError:
+ pass
+ except OSError as e:
+ if e.errno == errno.EINVAL:
+ pass
+ else:
+ raise
+ sleep(5)
+
+ def stop(self):
+ self._stop_event.set()
+
+ def stopped(self):
+ return self._stop_event.is_set()
+
+
+def escape_ansi(line):
+ ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]')
+ return ansi_escape.sub('', line)
+
+
+def run_with_progress(
+ cmd: List[str],
+ env_variables: Dict[str, str],
+ test_type: str,
+ python: str,
+ backend: str,
+ version: str,
+ verbose: bool,
+ dry_run: bool,
+) -> RunCommandResult:
+ title = f"Running tests: {test_type}, Python: {python}, Backend:
{backend}:{version}"
+ try:
+ with tempfile.NamedTemporaryFile(mode='w+t', delete=False) as f:
+ get_console().print(f"[info]Starting test = {title}[/]")
+ thread = MonitoringThread(title=title, file_name=f.name)
+ thread.start()
+ try:
+ result = run_command(
+ cmd,
+ verbose=verbose,
+ dry_run=dry_run,
+ env=env_variables,
+ check=False,
+ stdout=f,
+ stderr=subprocess.STDOUT,
+ )
+ finally:
+ thread.stop()
+ thread.join()
+ with ci_group(
+ f"Result of {title}", message_type="[success]" if
result.returncode == 0 else "[error]"
+ ):
+ with open(f.name) as f:
+ shutil.copyfileobj(f, sys.stdout)
Review Comment:
The `copyfileobj` sends directly content of the file to stdeout without any
buffering and reading it to memory.
If you read the content to a string and print it - it will be loaded in
memory first. the output can be potentially very long (20-30MB) and when you
load it to string - this is how much memory you wil use. And we already had a
lot of troubles with our tests exceeding memory available to run tests (that's
why some tests are not run when we have mssql or mysql database).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]