devinjdangelo opened a new pull request, #7791:
URL: https://github.com/apache/arrow-datafusion/pull/7791
## Which issue does this PR close?
Closes #5383
Closes #7767
Progresses towards #7744
## Rationale for this change
Currently, we use the partitioning of the input plan to determine how files
are written out. This has a number of drawbacks, all stemming from the fact
that we have limited information at planning time.
- The input plan may have many empty partitions #5383, causing many empty
files to be written
- We cannot determine correct hive-style partitioning at planning time #7744
If instead, `DataSink`s are responsible for partitioning a single input
stream at execution time, we can have more fine grained control over how data
is laid out into files.
## What changes are included in this PR?
This PR introduces an execution time demux task, which takes a single
`SendableRecordBatchStream` and divides it into a dynamic number of output
streams, which are then serialized to independent files. The division of the
input stream is currently determined exclusively by the number of rows we want
in each file, but in the future could be made more sophisticated (such as for
#7744).
```
┌───────────┐ ┌────────────┐ ┌─────────────┐
┌──────▶
│ batch 1 ├────▶...──────▶│ Batch a │ │ Output File1│
│
└───────────┘ └────────────┘ └─────────────┘
│
┌──────────┐ │
┌───────────┐ ┌────────────┐ ┌─────────────┐
┌───────────┐ ┌────────────┐ │ │ ├──────▶
│ batch a+1├────▶...──────▶│ Batch b │ │ Output File2│
│ batch 1 ├────▶...──────▶│ Batch N ├─────▶│ Demux ├────────┤ ...
└───────────┘ └────────────┘ └─────────────┘
└───────────┘ └────────────┘ │ │ │
└──────────┘ │
┌───────────┐ ┌────────────┐ ┌─────────────┐
└──────▶
│ batch d ├────▶...──────▶│ Batch n │ │ Output FileN│
└───────────┘ └────────────┘ └─────────────┘
```
To accomplish the above, the demux task shares a channel with the caller,
which itself communicates channels of RecordBatches. These are the key types:
```rust
type RecordBatchReceiver = Receiver<RecordBatch>;
type DemuxedStreamReceiver = Receiver<(Path, RecordBatchReceiver)>;
```
The caller of the demux task is responsible for consuming a variable number
of `RecordBatchReceiver`s. DataSinks in general will want to spawn independent
tasks to consume each `RecordBatchReceiver` and serialize/write them to
`ObjectStore` writer corresponding to the `Path` returned by the demux.
Various config options are added to allow the user to control the tradeoff
between buffering more data in memory and higher parallelism.
## Are these changes tested?
Yes, by existing tests.
## Are there any user-facing changes?
No more empty files written out
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]