guillaume-rochette-oxb commented on issue #48962: URL: https://github.com/apache/arrow/issues/48962#issuecomment-3812126696
Hi @AlenkaF, I have re-read the documentation about the design and the concept of [Table](https://arrow.apache.org/docs/python/data.html#tables), [RecordBatch](https://arrow.apache.org/docs/python/data.html#record-batches) and the RecordBatch's slicing capabilities, and I believe that I understand them, moreover I am already using these zero-copy slicing capabilities in the associated [PR](https://github.com/apache/arrow/pull/48963). However, I am sorry that I did not sufficiently describe nor motivate the need to re-stack streams of `pa.RecordBatch`, and I will try to remedy to this. For conciseness, we define the terms size and mass of a `pa.RecordBatch`, a `pa.Table`, or `pad.Dataset`, as their respective number of rows and bytes. Let's consider a simple pipeline for data processing. First, reading batches from a source dataset, whose cumulative mass could be larger than the available memory, say a few terabytes or even petabytes. Then, applying some arbitrary business logic to the incoming batches. And finally, writing the outgoing batches to a destination dataset. Let's consider the following snippet, ```python import itertools import pyarrow as pa import pyarrow.dataset as pad def foo(batch: pa.RecordBatch) -> pa.RecordBatch: # Some imaginary business logic is applied to the RecordBatch, altering its content # by adding, removing, combining columns, etc. return batch def main(src_path: str, dst_path: str, batch_size: int): # Source -> Iterable[RecordBatch] # We read the input batches from the source dataset. iterable = pad.dataset(source=src_path).to_batches(batch_size=batch_size) # Iterable[RecordBatch] -> Iterable[RecordBatch] # We apply a transformation to the batches iterable = map(foo, iterable) # Iterable[RecordBatch] -> Iterable[RecordBatch] # We extract the schema from the first output batch and re-join it to the iterable batch = next(iter(iterable)) schema = batch.schema iterable = itertools.chain([batch], iterable) batch = None # Iterable[RecordBatch] -> Sink # We write the output batches to the destination dataset. pad.write_dataset(iterable, dst_path, format="parquet", schema=schema) if __name__ == '__main__': main(src_path="/path/to/src/dataset", dst_path="/path/to/dst/dataset", batch_size=16) ``` In the above snippet, let's ignore the fact that the `write_dataset()` function would currently write to a single Parquet file containing 2^20 rows per group, which might not be optimal for evident reasons. The `pad.Dataset.to_batches()` method allows us to set a static batch size to the `Iterable` of `pa.RecordBatch`, but that might be suboptimal for controlling the mass of incoming `pa.RecordBatch` with an arbitrary `pa.Schema`. This is exacerbated in the cases where the `pa.Schema` contains a column with variable-length type, such as `pa.ListArray`, `pa.StringArray`, `pa.BinaryArray`, etc. And, this is worsened for a variable-length typed column whose rows have a high variance w.r.t. to their number of elements. In such cases, the `pad.Dataset.to_batches(batch_size=batch_size)` method could yield an `Iterable` of `pa.RecordBatch` with heavily fluctuating masses, i.e. the first batch has a mass of 100 MiB, but the second has a mass of 1 GiB. This results in hard-to-predict, if not erratic, edge cases w.r.t. the overall memory footprint of the pipeline. Moreover, we can quantify the expansion factor of the `foo()` processing function, i.e. let `i`, an incoming `pa.RecordBatch` of mass `n`, and `o = foo(i)`, an outgoing `pa.RecordBatch` of mass `m`, the expansion factor would be `e = m / n`. If `e > 1`, it is dilating, and conversely if `e < 1` it is contracting. And akin to [Big O notation](https://en.wikipedia.org/wiki/Big_O_notation) we can derive its worst-, average- and best-case memory usage scenarios. Obviously, compounding the fluctuations in mass with a potential dilation worsens the overall stability of the memory footprint. Thus, resulting in conservative, and inefficient, measures being taken, such as setting an intentionally low batch size, or oversizing the size of the processing machines. Therefore, I suggest to introduce a functionality enabling the dynamic resizing of `Iterable` of `pa.RecordBatch` given minimums and maximums in terms of size and mass, in order to have a better control of the memory footprint. The snippet would look like, ```python import itertools import pyarrow as pa import pyarrow.dataset as pad def foo(batch: pa.RecordBatch) -> pa.RecordBatch: # Some imaginary business logic is applied to the RecordBatch, altering its content # by adding, removing, combining columns, etc. return batch def main( src_path: str, dst_path: str, min_rows_per_batch: int, max_rows_per_batch: int, min_bytes_per_batch: int, max_bytes_per_batch: int, ): # Source -> Iterable[RecordBatch] # We read the input batches from the source dataset. iterable = pad.dataset(source=src_path).to_batches() # Iterable[RecordBatch] -> Iterable[RecordBatch] # We resize the incoming batches to fit constraints in size and mass. iterable = pa.restack_batches( iterable, min_rows_per_batch=min_rows_per_batch, max_rows_per_batch=max_rows_per_batch, min_bytes_per_batch=min_bytes_per_batch, max_bytes_per_batch=max_bytes_per_batch, ) # Iterable[RecordBatch] -> Iterable[RecordBatch] # We apply a transformation to the batches iterable = map(foo, iterable) # Iterable[RecordBatch] -> Iterable[RecordBatch] # We extract the schema from the first output batch and re-join it to the iterable batch = next(iter(iterable)) schema = batch.schema iterable = itertools.chain([batch], iterable) batch = None # Iterable[RecordBatch] -> Sink # We write the output batches to the destination dataset. pad.write_dataset(iterable, dst_path, format="parquet", schema=schema) if __name__ == "__main__": main( src_path="/path/to/src/dataset", dst_path="/path/to/dst/dataset", min_rows_per_batch=0, max_rows_per_batch=2 ** (10 + 10), # 1M rows per batch min_bytes_per_batch=0, max_bytes_per_batch=2 ** (10 + 10 + 9), # 512 MiB per batch ) ``` Arguably, we could also use the `restack_batches()` function to resize the `Iterable` of outgoing `pa.RecordBatch` and change slightly how we would write the destination dataset in order to write multiple smaller Parquet files, such that, ```python import itertools from typing import Iterable import pyarrow as pa import pyarrow.dataset as pad import pyarrow.parquet as pap def foo(batch: pa.RecordBatch) -> pa.RecordBatch: # Some imaginary business logic is applied to the RecordBatch, altering its content # by adding, removing, combining columns, etc. return batch def write_dataset( iterable: Iterable[pa.RecordBatch], path: str, ): for index, batch in enumerate(iterable): pap.write_table( table=pa.Table.from_batches(batches=[batch], schema=batch.schema), where=f"{path}/{index:012d}.parquet", row_group_size=batch.num_rows, compression="snappy", ) def main( src_path: str, dst_path: str, min_rows_per_batch: int, max_rows_per_batch: int, min_bytes_per_batch: int, max_bytes_per_batch: int, ): # Source -> Iterable[RecordBatch] # We read the input batches from the source dataset. iterable = pad.dataset(source=src_path).to_batches() # Iterable[RecordBatch] -> Iterable[RecordBatch] # We resize the incoming batches to fit constraints in size and mass. iterable = pa.restack_batches( iterable, min_rows_per_batch=min_rows_per_batch, max_rows_per_batch=max_rows_per_batch, min_bytes_per_batch=min_bytes_per_batch, max_bytes_per_batch=max_bytes_per_batch, ) # Iterable[RecordBatch] -> Iterable[RecordBatch] # We apply a transformation to the batches iterable = map(foo, iterable) # Iterable[RecordBatch] -> Iterable[RecordBatch] # We resize the outgoing batches to fit constraints in size and mass. iterable = pa.restack_batches( iterable, min_rows_per_batch=min_rows_per_batch, max_rows_per_batch=max_rows_per_batch, min_bytes_per_batch=min_bytes_per_batch, max_bytes_per_batch=max_bytes_per_batch, ) # Iterable[RecordBatch] -> Sink # We write the output batches to the destination dataset, but this time in smaller files. write_dataset(iterable, path=dst_path) if __name__ == "__main__": main( src_path="/path/to/src/dataset", dst_path="/path/to/dst/dataset", min_rows_per_batch=0, max_rows_per_batch=2 ** (10 + 10), # 1M rows per batch min_bytes_per_batch=0, max_bytes_per_batch=2 ** (10 + 10 + 9), # 512 MiB per batch ) ``` One fair point of criticism made in the [PR](https://github.com/apache/arrow/pull/48963#issuecomment-3809876144) is that the `restack_batches` function is written in Python and not in C++, and most likely sub-par w.r.t. its own efficiency, but that's a starting point rather than the end of a discussion in my opinion. Another point of criticism would be that it would more efficient to fuse the resizing of the batches while reading, e.g. in the `pad.Dataset.to_batches()` method, and writing, e.g. in the `pad.write_dataset() function. PS: It seems that I do not have the permission to re-open the issue, could you please re-open it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
