guozhans commented on issue #40738:
URL: https://github.com/apache/arrow/issues/40738#issuecomment-2016835221

   Hi @kyle-ip,
   Thanks for the info, and i didn't. But the issue still occurred after i 
tried. The issue can be seen in multi-thread or multi-process environment. I 
don't know if it must work under single thread? This issue occurred only if i 
changed **n_workers** more than one
   
   Result 
   ```
   Line #    Mem usage    Increment  Occurrences   Line Contents
   =============================================================
       39    394.6 MiB    394.6 MiB           1   @profile
       40                                         def to_parquet(df: 
pd.DataFrame, filename: str):
       41    386.2 MiB     -8.4 MiB           1       table = 
Table.from_pandas(df)
       42    386.2 MiB      0.0 MiB           1       pool = 
pa.default_memory_pool()
       43    401.2 MiB     15.0 MiB           1       
parquet.write_table(table, filename, compression="snappy")
       44    401.2 MiB      0.0 MiB           1       del table
       45    401.2 MiB      0.0 MiB           1       pool.release_unused()
   ```
   
   The script to reproduce this issue
   ```python
   
   import logging
   import os
   from concurrent.futures import ThreadPoolExecutor
   
   import dask
   import dask.dataframe as dd
   import pandas as pd
   import pyarrow as pa
   from dask import delayed
   
   from distributed import WorkerPlugin, Worker, LocalCluster, Client, wait
   from loky import ProcessPoolExecutor
   from memory_profiler import profile
   from pyarrow import Table, parquet
   
   
   class TaskExecutorPool(WorkerPlugin):
       def __init__(self, logger, name):
           self.logger = logger
           self.worker = None
           self.name = name
   
       def setup(self, worker: Worker):
           executor = ThreadPoolExecutor(max_workers=worker.state.nthreads)
           worker.executors[self.name] = executor
           self.worker = worker
   
   
   @profile
   def to_parquet(df: pd.DataFrame, filename: str):
       table = Table.from_pandas(df)
       pool = pa.default_memory_pool()
       parquet.write_table(table, filename, compression="snappy")
       del table
       pool.release_unused()
   
   
   def from_parquet(filename: str):
       return pd.read_parquet(filename)
   
   
   def main():
       cluster = LocalCluster(n_workers=2, processes=False, 
silence_logs=logging.DEBUG)
       with Client(cluster) as client:
           client.register_plugin(TaskExecutorPool(logging, "process"), 
name="process")
           with dask.annotate(executor="process", retries=10):
               nodes = dd.read_parquet("a parquet file", columns=["id", "tags"])
               os.makedirs("/opt/project/parquet", exist_ok=True)
               for i in range(1, 10):
                   dfs = nodes.to_delayed()
                   filenames = [os.path.join("/opt/project/parquet", 
f"nodes-{i}.parquet") for i, df in enumerate(dfs)]
                   writes = [delayed(to_parquet)(df, fn) for df, fn in zip(dfs, 
filenames)]
                   dd.compute(*writes)
                   wait(writes)
   
   
   if __name__ == "__main__":
       main()
   ```
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to