jorisvandenbossche commented on PR #34249:
URL: https://github.com/apache/arrow/pull/34249#issuecomment-1468028906

   I added python bindings for this to give it a go (I could push those here, 
if you want).
   
   I was curious to check that it actually sorts the full dataset (and not just 
per batch) and so how this would then perform. Doing:
   
   ```python
   import pyarrow as pa
   import pyarrow.compute as pc
   from pyarrow._acero import OrderByNodeOptions, TableSourceNodeOptions, 
Declaration
   
   arr1 = np.arange(10_000_000)
   arr2 = np.arange(10_000_000)
   np.random.shuffle(arr2)
   table = pa.table({'a': arr1, 'b': arr2})
   table_chunked = 
pa.Table.from_batches(table.to_batches(max_chunksize=1_000_000))
   
   decl1 = Declaration.from_sequence([
       Declaration("table_source", TableSourceNodeOptions(table)),
       Declaration("order_by", OrderByNodeOptions([("b", "ascending")]))
   ])
   
   %time res1 = decl1.to_table()
   
   
   decl2 = Declaration.from_sequence([
       Declaration("table_source", TableSourceNodeOptions(table_chunked)),
       Declaration("order_by", OrderByNodeOptions([("b", "ascending")]))
   ])
   %time res2 = decl2.to_table()
   # CPU times: user 7.4 s, sys: 123 ms, total: 7.52 s
   # Wall time: 7.52 s
   ```
   
   Both those timings take around 7.5s on my laptop (release build). So running 
the exec plan on a Table of one chunk or a chunked table doesn't seem to matter 
in practice, I assume because Acero actually still uses smaller exec batches 
anyway. 
   
   Now, comparing this with the "manual" way to sort the data using the direct 
compute kernel (this is also how we currently implemented `Table.sort_by`), 
that is quite a bit faster:
   
   ```
   In [4]: %%time
      ...: indices = pc.sort_indices(table, [("b", "ascending")])
      ...: table.take(indices)
   CPU times: user 2.39 s, sys: 135 ms, total: 2.53 s
   Wall time: 2.54 s
   
   In [5]: %%time
      ...: indices = pc.sort_indices(table_chunked, [("b", "ascending")])
      ...: table_chunked.take(indices)
   CPU times: user 4.27 s, sys: 83.6 ms, total: 4.35 s
   Wall time: 4.35 s
   ```
   
   So here it actually matters if the table is chunked or not (I assume in this 
case the compute kernel uses the batching as how it is present in the data, and 
doesn't use smaller batch size automatically). 
   
   And when manually using a similarly small batch size as Acero, we get more 
or less the same timing of 7s as above:
   
   ```
   In [11]: table_chunked2 = 
pa.Table.from_batches(table.to_batches(max_chunksize=32768))
   
   In [12]: %%time
       ...: indices = pc.sort_indices(table_chunked2, [("b", "ascending")])
       ...: table_chunked2.take(indices)
   CPU times: user 7.53 s, sys: 98.6 ms, total: 7.63 s
   Wall time: 7.63 s
   ```
   
   This might all be expected (I think sorting chunks is done by first sorting 
per chunk and then "merging" the indices? That certainly sounds to give some 
overhead compared to sorting in one go on a larger array). But that also means 
that if you have in-memory data (or a dataset that will fit easily into memory) 
and you have an order_by node in your pipeline, it might be beneficial to set a 
larger exec batch size? 
   
   For example, for the `Table.sort_by` implementation, if that would rely on 
using an ExecPlan (which is actually what we first did, assuming this would be 
more efficient 
(https://github.com/apache/arrow/pull/14976#discussion_r1054485541), but I 
changed that to use sort_indices/take again to fix minimal build tests), we 
would need to specify a large batch size, otherwise this would be a lot slower 
than it can be. 
   
   For in-memory data like a Table, it's actually faster to just concatenate 
(copy) the data to a single contiguous chunk first and then sort, than keeping 
the existing chunks:
   
   ```
   In [29]: %%time
       ...: table_concatted = 
pa.Table.from_arrays([pa.concat_arrays(table_chunked2["a"].chunks), 
pa.concat_arrays(table_chunked2["b"].chunks)], schema=table_chunked2.schema)
       ...: indices = pc.sort_indices(table_concatted, [("b", "ascending")])
       ...: table_concatted.take(indices)
   CPU times: user 2.74 s, sys: 132 ms, total: 2.87 s
   Wall time: 2.88 s
   ```
   
   This becomes a bit off-topic for this actual PR (sorry!), but so that 
probably means we should certainly expose configuring the exec batch size? 
(inferring this automatically if there is an order_by node is probably 
difficult to do generally). This isn't yet exposed in the python bindings (so 
can't test it), but I also didn't directly see how it is done in C++. The 
`QueryOptions` don't seem to have such a argument. In exec.h, there is 
`kDefaultExecChunksize` defined, but I don't see that actually used somewhere. 
   ExecContext has a `set_exec_chunksize`, but it's default is max<int64>, 
which is much larger than what is actually used.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to