[
https://issues.apache.org/jira/browse/ARROW-12872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alec Buckenheimer updated ARROW-12872:
--------------------------------------
Component/s: Python
C++ - Plasma
> `pyarrow._plasma.PlasmaClient.delete` behavior undocumented
> -----------------------------------------------------------
>
> Key: ARROW-12872
> URL: https://issues.apache.org/jira/browse/ARROW-12872
> Project: Apache Arrow
> Issue Type: Improvement
> Components: C++ - Plasma, Python
> Affects Versions: 4.0.0
> Reporter: Alec Buckenheimer
> Priority: Major
>
> Hi all,
> I've been using plasma to speed up some multiprocessing I'm doing and have
> had issues where my plasma server runs out of memory even though my logs show
> that I should have plenty of space accounting for `size_created -
> size_deleted` . The documentation for plasma is [a bit
> scarce|#using-arrow-and-pandas-with-plasma] and does not mention how to use
> the `.delete` method but I'd expect that running a `.delete([oid])` would
> free up that space immediately which does not seem to be the case. The tests
> for this are [actually commented
> out|https://github.com/apache/arrow/blob/b4f2c1c72f745acf12e8a6d2d031750745ab2de2/python/pyarrow/tests/test_plasma.py#L592],
> and doing some testing of my own I've found that delete actually working is
> implicitly linked to some reference counter on the buffer. This kinda makes
> sense from the zero-copy perspective but makes it really difficult to keep a
> lid on the amount of data in the plasma store as its not immediately clear
> which refs are hanging around where. Should I be explicitly copying buffer
> data after pulling from plasma to make these deletes happen (if so how do i
> do that)?
> For what its worth I've captured this behavior inline below, if someone could
> just tell me if this is expected and if there's an easy work around I'd
> really appreciate that. I'm sure the docs could use a bit of love too.
>
>
> {code:java}
> import pyarrow as pa
> import pyarrow.plasma as pl
> def table_to_plasma(
> table: pa.Table,
> cli: pl.PlasmaClient,
> ) -> pl.ObjectID:
> batches = table.to_batches() size = sum(b.nbytes for b in batches)
> # actual buffer space is a tiny bit more than the size of the tables so
> add
> # some wiggle room
> size = int(max(size * 1.01, size + 512))
> oid = pl.ObjectID.from_random()
> buf = cli.create(oid, size)
> writer = pa.ipc.new_stream(
> pa.FixedSizeBufferWriter(buf), batches[0].schema
> )
> for b in batches:
> writer.write_batch(b)
> writer.close()
> cli.seal(oid)
> return oid
> def table_from_plasma(
> oid: pl.ObjectID,
> cli:pl.PlasmaClient,
> ) -> pa.Table:
> buf = cli.get_buffers([oid])
> assert len(buf) == 1
> buf = buf[0]
> stream = pa.ipc.open_stream(buf)
> return stream.read_all()
> def test():
> t = pa.table([pa.array([1])], schema=pa.schema([pa.field('a',
> pa.int64())]))
> with pl.start_plasma_store(int(1e8)) as (pl_name, pl_proc):
> cli = pl.connect(pl_name)
> oid = table_to_plasma(t, cli)
> t2 = table_from_plasma(oid, cli)
> assert len(t2) == len(t)
> cli.delete([oid])
> assert not cli.contains(oid) # this unexpectedly fails del t2
> import gc
> gc.collect()
> assert not cli.contains(oid) # this succeeds
> {code}
> import pyarrow as pa
> import pyarrow.plasma as pl
>
> def table_to_plasma(
> table: pa.Table,
> cli: pl.PlasmaClient,
> ) -> pl.ObjectID:
> batches = table.to_batches()
> size = sum(b.nbytes for b in batches)
> # actual buffer space is a tiny bit more than the size of the tables so add
> # some wiggle room
> size = int(max(size * 1.01, size + 512))
> oid = pl.ObjectID.from_random()
> buf = cli.create(oid, size)
> writer = pa.ipc.new_stream(
> pa.FixedSizeBufferWriter(buf), batches[0].schema
> )
> for b in batches:
> writer.write_batch(b)
> writer.close()
> cli.seal(oid)
> return oid
> def table_from_plasma(
> oid: pl.ObjectID,
> cli:pl.PlasmaClient,
> ) -> pa.Table:
> buf = cli.get_buffers([oid])
> assert len(buf) == 1
> buf = buf[0]
> stream = pa.ipc.open_stream(buf)
> return stream.read_all()
> def test():
> t = pa.table([pa.array([1])], schema=pa.schema([pa.field('a', pa.int64())]))
> with pl.start_plasma_store(int(1e8)) as (pl_name, pl_proc):
> cli = pl.connect(pl_name)
> oid = table_to_plasma(t, cli)
> t2 = table_from_plasma(oid, cli)
> assert len(t2) == len(t)
> cli.delete([oid])
> assert not cli.contains(oid) # this unexpectedly fails
> del t2
> import gc
> gc.collect()
> assert not cli.contains(oid) # this succeeds
> ```
--
This message was sent by Atlassian Jira
(v8.3.4#803005)