jorisvandenbossche commented on PR #34249:
URL: https://github.com/apache/arrow/pull/34249#issuecomment-1468028906
I added python bindings for this to give it a go (I could push those here,
if you want).
I was curious to check that it actually sorts the full dataset (and not just
per batch) and so how this would then perform. Doing:
```python
import pyarrow as pa
import pyarrow.compute as pc
from pyarrow._acero import OrderByNodeOptions, TableSourceNodeOptions,
Declaration
arr1 = np.arange(10_000_000)
arr2 = np.arange(10_000_000)
np.random.shuffle(arr2)
table = pa.table({'a': arr1, 'b': arr2})
table_chunked =
pa.Table.from_batches(table.to_batches(max_chunksize=1_000_000))
decl1 = Declaration.from_sequence([
Declaration("table_source", TableSourceNodeOptions(table)),
Declaration("order_by", OrderByNodeOptions([("b", "ascending")]))
])
%time res1 = decl1.to_table()
decl2 = Declaration.from_sequence([
Declaration("table_source", TableSourceNodeOptions(table_chunked)),
Declaration("order_by", OrderByNodeOptions([("b", "ascending")]))
])
%time res2 = decl2.to_table()
# CPU times: user 7.4 s, sys: 123 ms, total: 7.52 s
# Wall time: 7.52 s
```
Both those timings take around 7.5s on my laptop (release build). So running
the exec plan on a Table of one chunk or a chunked table doesn't seem to matter
in practice, I assume because Acero actually still uses smaller exec batches
anyway.
Now, comparing this with the "manual" way to sort the data using the direct
compute kernel (this is also how we currently implemented `Table.sort_by`),
that is quite a bit faster:
```
In [4]: %%time
...: indices = pc.sort_indices(table, [("b", "ascending")])
...: table.take(indices)
CPU times: user 2.39 s, sys: 135 ms, total: 2.53 s
Wall time: 2.54 s
In [5]: %%time
...: indices = pc.sort_indices(table_chunked, [("b", "ascending")])
...: table_chunked.take(indices)
CPU times: user 4.27 s, sys: 83.6 ms, total: 4.35 s
Wall time: 4.35 s
```
So here it actually matters if the table is chunked or not (I assume in this
case the compute kernel uses the batching as how it is present in the data, and
doesn't use smaller batch size automatically).
And when manually using a similarly small batch size as Acero, we get more
or less the same timing of 7s as above:
```
In [11]: table_chunked2 =
pa.Table.from_batches(table.to_batches(max_chunksize=32768))
In [12]: %%time
...: indices = pc.sort_indices(table_chunked2, [("b", "ascending")])
...: table_chunked2.take(indices)
CPU times: user 7.53 s, sys: 98.6 ms, total: 7.63 s
Wall time: 7.63 s
```
This might all be expected (I think sorting chunks is done by first sorting
per chunk and then "merging" the indices? That certainly sounds to give some
overhead compared to sorting in one go on a larger array). But that also means
that if you have in-memory data (or a dataset that will fit easily into memory)
and you have an order_by node in your pipeline, it might be beneficial to set a
larger exec batch size?
For example, for the `Table.sort_by` implementation, if that would rely on
using an ExecPlan (which is actually what we first did, assuming this would be
more efficient
(https://github.com/apache/arrow/pull/14976#discussion_r1054485541), but I
changed that to use sort_indices/take again to fix minimal build tests), we
would need to specify a large batch size, otherwise this would be a lot slower
than it can be.
For in-memory data like a Table, it's actually faster to just concatenate
(copy) the data to a single contiguous chunk first and then sort, than keeping
the existing chunks:
```
In [29]: %%time
...: table_concatted =
pa.Table.from_arrays([pa.concat_arrays(table_chunked2["a"].chunks),
pa.concat_arrays(table_chunked2["b"].chunks)], schema=table_chunked2.schema)
...: indices = pc.sort_indices(table_concatted, [("b", "ascending")])
...: table_concatted.take(indices)
CPU times: user 2.74 s, sys: 132 ms, total: 2.87 s
Wall time: 2.88 s
```
This becomes a bit off-topic for this actual PR (sorry!), but so that
probably means we should certainly expose configuring the exec batch size?
(inferring this automatically if there is an order_by node is probably
difficult to do generally). This isn't yet exposed in the python bindings (so
can't test it), but I also didn't directly see how it is done in C++. The
`QueryOptions` don't seem to have such a argument. In exec.h, there is
`kDefaultExecChunksize` defined, but I don't see that actually used somewhere.
ExecContext has a `set_exec_chunksize`, but it's default is max<int64>,
which is much larger than what is actually used.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]