Anakin100100 commented on issue #49474: URL: https://github.com/apache/arrow/issues/49474#issuecomment-4085405993
I've investigated this deeper and it's become clear that there is no memory leak here. I initially managed to reproduce this. Afterwards I looked at what was allocated in the RSS and most of it is mimalloc pages that are cached rather than being returned to the os https://github.com/microsoft/mimalloc. If you want to run this in HPC environment you can do it with forcing mimalloc to return the memory when it is released by arrow by setting ```bash export MIMALLOC_PURGE_DELAY=0 ``` I modified your example slightly to demonstrate this. ```python import gc import os import tempfile import numpy as np import psutil import pyarrow as pa import pyarrow.parquet as pq import pyarrow.dataset as pad import pyarrow.compute as pc BATCH_SIZE = 131_072 N_BATCHES = 200 def get_rss_mb(): return psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 def get_arrow_mb(): return pa.total_allocated_bytes() / 1024 / 1024 def generate_dataset(base_path: str, n_rows: int = 30_000_000): rng = np.random.default_rng(42) for a in [1, 2]: for b in [1, 2]: part_dir = os.path.join(base_path, f"A={a}", f"B={b}") os.makedirs(part_dir, exist_ok=True) pq.write_table( pa.table({"C": rng.standard_normal(n_rows).astype(np.float32)}), os.path.join(part_dir, "data.parquet"), row_group_size=BATCH_SIZE, ) def run(dataset_path: str, img_label: str): dataset = pad.dataset(dataset_path, format="parquet", partitioning="hive") scanner = dataset.scanner( filter=(pc.field("A") == pc.scalar(1)) & (pc.field("B") == pc.scalar(1)), columns=["C"], batch_size=BATCH_SIZE, fragment_readahead=0, ) gc.collect() rss_log, arrow_log = [], [] for i, batch in enumerate(scanner.to_batches()): if i >= N_BATCHES: break _ = batch.column("C") pa.default_memory_pool().release_unused() gc.collect() rss_log.append(get_rss_mb()) arrow_log.append(get_arrow_mb()) if __name__ == "__main__": pa.set_memory_pool(pa.system_memory_pool()) for i in range(3): pa.default_memory_pool().release_unused() gc.collect() print(f"iter {i}") print(f"before rss: {get_rss_mb()} mb, arrow: {get_arrow_mb()} mb") with tempfile.TemporaryDirectory() as tmp: generate_dataset(tmp) run(tmp, "memleak_minimal") pa.default_memory_pool().release_unused() gc.collect() print(f"after rss: {get_rss_mb()} mb, arrow: {get_arrow_mb()} mb") ``` And here is the result ```bash (pyarrow-dev) pawel-biegun@pawel-biegun-AORUS-15-9KF:~/pyarrow-dev$ python minimal_working_example.py iter 0 before rss: 130.4765625 mb, arrow: 0.0 mb after rss: 159.55859375 mb, arrow: 0.0 mb iter 1 before rss: 159.55859375 mb, arrow: 0.0 mb after rss: 158.0625 mb, arrow: 0.0 mb iter 2 before rss: 158.0625 mb, arrow: 0.0 mb after rss: 159.26171875 mb, arrow: 0.0 mb ``` This shows that there is no memory leak here. @pitrou what would be a good place in the docs to document this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
