guozhans commented on issue #40738:
URL: https://github.com/apache/arrow/issues/40738#issuecomment-2016835221
Hi @kyle-ip,
Thanks for the info, and i didn't. But the issue still occurred after i
tried. The issue can be seen in multi-thread or multi-process environment. I
don't know if it must work under single thread? This issue occurred only if i
changed **n_workers** more than one
Result
```
Line # Mem usage Increment Occurrences Line Contents
=============================================================
39 394.6 MiB 394.6 MiB 1 @profile
40 def to_parquet(df:
pd.DataFrame, filename: str):
41 386.2 MiB -8.4 MiB 1 table =
Table.from_pandas(df)
42 386.2 MiB 0.0 MiB 1 pool =
pa.default_memory_pool()
43 401.2 MiB 15.0 MiB 1
parquet.write_table(table, filename, compression="snappy")
44 401.2 MiB 0.0 MiB 1 del table
45 401.2 MiB 0.0 MiB 1 pool.release_unused()
```
The script to reproduce this issue
```python
import logging
import os
from concurrent.futures import ThreadPoolExecutor
import dask
import dask.dataframe as dd
import pandas as pd
import pyarrow as pa
from dask import delayed
from distributed import WorkerPlugin, Worker, LocalCluster, Client, wait
from loky import ProcessPoolExecutor
from memory_profiler import profile
from pyarrow import Table, parquet
class TaskExecutorPool(WorkerPlugin):
def __init__(self, logger, name):
self.logger = logger
self.worker = None
self.name = name
def setup(self, worker: Worker):
executor = ThreadPoolExecutor(max_workers=worker.state.nthreads)
worker.executors[self.name] = executor
self.worker = worker
@profile
def to_parquet(df: pd.DataFrame, filename: str):
table = Table.from_pandas(df)
pool = pa.default_memory_pool()
parquet.write_table(table, filename, compression="snappy")
del table
pool.release_unused()
def from_parquet(filename: str):
return pd.read_parquet(filename)
def main():
cluster = LocalCluster(n_workers=2, processes=False,
silence_logs=logging.DEBUG)
with Client(cluster) as client:
client.register_plugin(TaskExecutorPool(logging, "process"),
name="process")
with dask.annotate(executor="process", retries=10):
nodes = dd.read_parquet("a parquet file", columns=["id", "tags"])
os.makedirs("/opt/project/parquet", exist_ok=True)
for i in range(1, 10):
dfs = nodes.to_delayed()
filenames = [os.path.join("/opt/project/parquet",
f"nodes-{i}.parquet") for i, df in enumerate(dfs)]
writes = [delayed(to_parquet)(df, fn) for df, fn in zip(dfs,
filenames)]
dd.compute(*writes)
wait(writes)
if __name__ == "__main__":
main()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]