guillaume-rochette-oxb commented on issue #48962:
URL: https://github.com/apache/arrow/issues/48962#issuecomment-3812126696

   Hi @AlenkaF,
   
   I have re-read the documentation about the design and the concept of 
[Table](https://arrow.apache.org/docs/python/data.html#tables), 
[RecordBatch](https://arrow.apache.org/docs/python/data.html#record-batches) 
and the RecordBatch's slicing capabilities, and I believe that I understand 
them, moreover I am already using these zero-copy slicing capabilities in the 
associated [PR](https://github.com/apache/arrow/pull/48963).
   
   However, I am sorry that I did not sufficiently describe nor motivate the 
need to re-stack streams of `pa.RecordBatch`, and I will try to remedy to this.
   
   For conciseness, we define the terms size and mass of a `pa.RecordBatch`, a 
`pa.Table`, or `pad.Dataset`, as their respective number of rows and bytes.
   
   Let's consider a simple pipeline for data processing.
   First, reading batches from a source dataset, whose cumulative mass could be 
larger than the available memory, say a few terabytes or even petabytes.
   Then, applying some arbitrary business logic to the incoming batches.
   And finally, writing the outgoing batches to a destination dataset.
   
   Let's consider the following snippet,
   ```python
   import itertools
   
   import pyarrow as pa
   import pyarrow.dataset as pad
   
   
   def foo(batch: pa.RecordBatch) -> pa.RecordBatch:
       # Some imaginary business logic is applied to the RecordBatch, altering 
its content
       # by adding, removing, combining columns, etc.
       return batch
   
   
   def main(src_path: str, dst_path: str, batch_size: int):
       # Source -> Iterable[RecordBatch]
       # We read the input batches from the source dataset.
       iterable = pad.dataset(source=src_path).to_batches(batch_size=batch_size)
       # Iterable[RecordBatch] -> Iterable[RecordBatch]
       # We apply a transformation to the batches
       iterable = map(foo, iterable)
       # Iterable[RecordBatch] -> Iterable[RecordBatch]
       # We extract the schema from the first output batch and re-join it to 
the iterable
       batch = next(iter(iterable))
       schema = batch.schema
       iterable = itertools.chain([batch], iterable)
       batch = None
       # Iterable[RecordBatch] -> Sink
       # We write the output batches to the destination dataset.
       pad.write_dataset(iterable, dst_path, format="parquet", schema=schema)
   
   
   if __name__ == '__main__':
       main(src_path="/path/to/src/dataset", dst_path="/path/to/dst/dataset", 
batch_size=16)
   
   ```
   In the above snippet, let's ignore the fact that the `write_dataset()` 
function would currently write to a single Parquet file containing 2^20 rows 
per group, which might not be optimal for evident reasons.
   
   The `pad.Dataset.to_batches()` method allows us to set a static batch size 
to the `Iterable` of `pa.RecordBatch`, but that might be suboptimal for 
controlling the mass of incoming `pa.RecordBatch` with an arbitrary `pa.Schema`.
   This is exacerbated in the cases where the `pa.Schema` contains a column 
with variable-length type, such as `pa.ListArray`, `pa.StringArray`, 
`pa.BinaryArray`, etc. 
   And, this is worsened for a variable-length typed column whose rows have a 
high variance w.r.t. to their number of elements.
   In such cases, the `pad.Dataset.to_batches(batch_size=batch_size)` method 
could yield an `Iterable` of `pa.RecordBatch` with heavily fluctuating masses, 
i.e. the first batch has a mass of 100 MiB, but the second has a mass of 1 GiB. 
   This results in hard-to-predict, if not erratic, edge cases w.r.t. the 
overall memory footprint of the pipeline.
   
   Moreover, we can quantify the expansion factor of the `foo()` processing 
function, i.e. let `i`,  an incoming `pa.RecordBatch` of mass `n`, and `o = 
foo(i)`, an outgoing `pa.RecordBatch` of mass `m`, the expansion factor would 
be `e = m / n`.
   If `e > 1`, it is dilating, and conversely if `e < 1` it is contracting.
   And akin to [Big O notation](https://en.wikipedia.org/wiki/Big_O_notation) 
we can derive its worst-, average- and best-case memory usage scenarios.
   
   Obviously, compounding the fluctuations in mass with a potential dilation 
worsens the overall stability of the memory footprint.
   Thus, resulting in conservative, and inefficient, measures being taken, such 
as setting an intentionally low batch size, or oversizing the size of the 
processing machines.
   
   Therefore, I suggest to introduce a functionality enabling the dynamic 
resizing of `Iterable` of `pa.RecordBatch` given minimums and maximums in terms 
of size and mass, in order to have a better control of the memory footprint.
   
   The snippet would look like,
   ```python
   import itertools
   
   import pyarrow as pa
   import pyarrow.dataset as pad
   
   
   def foo(batch: pa.RecordBatch) -> pa.RecordBatch:
       # Some imaginary business logic is applied to the RecordBatch, altering 
its content
       # by adding, removing, combining columns, etc.
       return batch
   
   
   def main(
           src_path: str,
           dst_path: str,
           min_rows_per_batch: int,
           max_rows_per_batch: int,
           min_bytes_per_batch: int,
           max_bytes_per_batch: int,
   ):
       # Source -> Iterable[RecordBatch]
       # We read the input batches from the source dataset.
       iterable = pad.dataset(source=src_path).to_batches()
       # Iterable[RecordBatch] -> Iterable[RecordBatch]
       # We resize the incoming batches to fit constraints in size and mass.
       iterable = pa.restack_batches(
           iterable,
           min_rows_per_batch=min_rows_per_batch,
           max_rows_per_batch=max_rows_per_batch,
           min_bytes_per_batch=min_bytes_per_batch,
           max_bytes_per_batch=max_bytes_per_batch,
       )
       # Iterable[RecordBatch] -> Iterable[RecordBatch]
       # We apply a transformation to the batches
       iterable = map(foo, iterable)
       # Iterable[RecordBatch] -> Iterable[RecordBatch]
       # We extract the schema from the first output batch and re-join it to 
the iterable
       batch = next(iter(iterable))
       schema = batch.schema
       iterable = itertools.chain([batch], iterable)
       batch = None
       # Iterable[RecordBatch] -> Sink
       # We write the output batches to the destination dataset.
       pad.write_dataset(iterable, dst_path, format="parquet", schema=schema)
   
   
   if __name__ == "__main__":
       main(
           src_path="/path/to/src/dataset",
           dst_path="/path/to/dst/dataset",
           min_rows_per_batch=0,
           max_rows_per_batch=2 ** (10 + 10),  # 1M rows per batch
           min_bytes_per_batch=0,
           max_bytes_per_batch=2 ** (10 + 10 + 9),  # 512 MiB per batch
       )
   
   ```
   
   Arguably, we could also use the `restack_batches()` function to resize the 
`Iterable` of outgoing `pa.RecordBatch` and change slightly how we would write 
the destination dataset in order to write multiple smaller Parquet files, such 
that,
   ```python
   import itertools
   from typing import Iterable
   
   import pyarrow as pa
   import pyarrow.dataset as pad
   import pyarrow.parquet as pap
   
   
   def foo(batch: pa.RecordBatch) -> pa.RecordBatch:
       # Some imaginary business logic is applied to the RecordBatch, altering 
its content
       # by adding, removing, combining columns, etc.
       return batch
   
   
   def write_dataset(
           iterable: Iterable[pa.RecordBatch],
           path: str,
   ):
       for index, batch in enumerate(iterable):
           pap.write_table(
               table=pa.Table.from_batches(batches=[batch], 
schema=batch.schema),
               where=f"{path}/{index:012d}.parquet",
               row_group_size=batch.num_rows,
               compression="snappy",
           )
   
   def main(
           src_path: str,
           dst_path: str,
           min_rows_per_batch: int,
           max_rows_per_batch: int,
           min_bytes_per_batch: int,
           max_bytes_per_batch: int,
   ):
       # Source -> Iterable[RecordBatch]
       # We read the input batches from the source dataset.
       iterable = pad.dataset(source=src_path).to_batches()
       # Iterable[RecordBatch] -> Iterable[RecordBatch]
       # We resize the incoming batches to fit constraints in size and mass.
       iterable = pa.restack_batches(
           iterable,
           min_rows_per_batch=min_rows_per_batch,
           max_rows_per_batch=max_rows_per_batch,
           min_bytes_per_batch=min_bytes_per_batch,
           max_bytes_per_batch=max_bytes_per_batch,
       )
       # Iterable[RecordBatch] -> Iterable[RecordBatch]
       # We apply a transformation to the batches
       iterable = map(foo, iterable)
       # Iterable[RecordBatch] -> Iterable[RecordBatch]
       # We resize the outgoing batches to fit constraints in size and mass.
       iterable = pa.restack_batches(
           iterable,
           min_rows_per_batch=min_rows_per_batch,
           max_rows_per_batch=max_rows_per_batch,
           min_bytes_per_batch=min_bytes_per_batch,
           max_bytes_per_batch=max_bytes_per_batch,
       )
       # Iterable[RecordBatch] -> Sink
       # We write the output batches to the destination dataset, but this time 
in smaller files.
       write_dataset(iterable, path=dst_path)
   
   
   if __name__ == "__main__":
       main(
           src_path="/path/to/src/dataset",
           dst_path="/path/to/dst/dataset",
           min_rows_per_batch=0,
           max_rows_per_batch=2 ** (10 + 10),  # 1M rows per batch
           min_bytes_per_batch=0,
           max_bytes_per_batch=2 ** (10 + 10 + 9),  # 512 MiB per batch
       )
   
   ```
   
   One fair point of criticism made in the 
[PR](https://github.com/apache/arrow/pull/48963#issuecomment-3809876144) is 
that the `restack_batches` function is written in Python and not in C++, and 
most likely sub-par w.r.t. its own efficiency, but that's a starting point 
rather than the end of a discussion in my opinion.
   Another point of criticism would be that it would more efficient to fuse the 
resizing of the batches while reading, e.g. in the `pad.Dataset.to_batches()` 
method, and writing, e.g. in the `pad.write_dataset() function.
   
   PS: It seems that I do not have the permission to re-open the issue, could 
you please re-open it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to