[ 
https://issues.apache.org/jira/browse/ARROW-12872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alec Buckenheimer updated ARROW-12872:
--------------------------------------
    Labels: docuentation usability  (was: )

> `pyarrow._plasma.PlasmaClient.delete` behavior undocumented
> -----------------------------------------------------------
>
>                 Key: ARROW-12872
>                 URL: https://issues.apache.org/jira/browse/ARROW-12872
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: C++ - Plasma, Python
>    Affects Versions: 4.0.0
>            Reporter: Alec Buckenheimer
>            Priority: Major
>              Labels: docuentation, usability
>
> Hi all,
> I've been using plasma to speed up some multiprocessing I'm doing and have 
> had issues where my plasma server runs out of memory even though my logs show 
> that I should have plenty of space accounting for `size_created - 
> size_deleted` . The documentation for plasma is [a bit 
> scarce|#using-arrow-and-pandas-with-plasma] and does not mention how to use 
> the `.delete` method but I'd expect that running a `.delete([oid])` would 
> free up that space immediately which does not seem to be the case. The tests 
> for this are [actually commented 
> out|https://github.com/apache/arrow/blob/b4f2c1c72f745acf12e8a6d2d031750745ab2de2/python/pyarrow/tests/test_plasma.py#L592],
>  and doing some testing of my own I've found that delete actually working is 
> implicitly linked to some reference counter on the buffer. This kinda makes 
> sense from the zero-copy perspective but makes it really difficult to keep a 
> lid on the amount of data in the plasma store as its not immediately clear 
> which refs are hanging around where. Should I be explicitly copying buffer 
> data after pulling from plasma to make these deletes happen (if so how do i 
> do that)?
> For what its worth I've captured this behavior inline below, if someone could 
> just tell me if this is expected and if there's an easy work around I'd 
> really appreciate that. I'm sure the docs could use a bit of love too.
>  
>  
> {code:java}
> import pyarrow as pa
> import pyarrow.plasma as pl
> def table_to_plasma(
>     table: pa.Table,
>     cli: pl.PlasmaClient,
> ) -> pl.ObjectID:
>     batches = table.to_batches()    size = sum(b.nbytes for b in batches)
>     # actual buffer space is a tiny bit more than the size of the tables so 
> add
>     # some wiggle room
>     size = int(max(size * 1.01, size + 512))
>     oid = pl.ObjectID.from_random()
>     buf = cli.create(oid, size)
>     writer = pa.ipc.new_stream(
>         pa.FixedSizeBufferWriter(buf), batches[0].schema
>     )
>     for b in batches:
>         writer.write_batch(b)
>     writer.close()
>     cli.seal(oid)
>     return oid
> def table_from_plasma(
>     oid: pl.ObjectID,
>     cli:pl.PlasmaClient,
> ) -> pa.Table:
>     buf = cli.get_buffers([oid])
>     assert len(buf) == 1
>     buf = buf[0]
>     stream = pa.ipc.open_stream(buf)
>     return stream.read_all()
> def test():
>     t = pa.table([pa.array([1])], schema=pa.schema([pa.field('a', 
> pa.int64())]))
>     with pl.start_plasma_store(int(1e8)) as (pl_name, pl_proc):
>         cli = pl.connect(pl_name)
>         oid = table_to_plasma(t, cli)
>         t2 = table_from_plasma(oid, cli)
>         assert len(t2) == len(t)
>         cli.delete([oid])
>         assert not cli.contains(oid)  # this unexpectedly fails        del t2
>         import gc
>         gc.collect()
>         assert not cli.contains(oid)  # this succeeds
> {code}
>  import pyarrow as pa
>  import pyarrow.plasma as pl
>  
> def table_to_plasma(
>  table: pa.Table,
>  cli: pl.PlasmaClient,
>  ) -> pl.ObjectID:
>  batches = table.to_batches()
> size = sum(b.nbytes for b in batches)
>  # actual buffer space is a tiny bit more than the size of the tables so add
>  # some wiggle room
>  size = int(max(size * 1.01, size + 512))
>  oid = pl.ObjectID.from_random()
>  buf = cli.create(oid, size)
>  writer = pa.ipc.new_stream(
>  pa.FixedSizeBufferWriter(buf), batches[0].schema
>  )
>  for b in batches:
>  writer.write_batch(b)
>  writer.close()
>  cli.seal(oid)
>  return oid
> def table_from_plasma(
>  oid: pl.ObjectID,
>  cli:pl.PlasmaClient,
>  ) -> pa.Table:
>  buf = cli.get_buffers([oid])
>  assert len(buf) == 1
>  buf = buf[0]
>  stream = pa.ipc.open_stream(buf)
>  return stream.read_all()
> def test():
>  t = pa.table([pa.array([1])], schema=pa.schema([pa.field('a', pa.int64())]))
>  with pl.start_plasma_store(int(1e8)) as (pl_name, pl_proc):
>  cli = pl.connect(pl_name)
>  oid = table_to_plasma(t, cli)
>  t2 = table_from_plasma(oid, cli)
>  assert len(t2) == len(t)
>  cli.delete([oid])
>  assert not cli.contains(oid) # this unexpectedly fails
> del t2
>  import gc
>  gc.collect()
>  assert not cli.contains(oid) # this succeeds
>  ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to