alamb commented on code in PR #5894:
URL: https://github.com/apache/arrow-datafusion/pull/5894#discussion_r1160635957
##########
datafusion/core/src/physical_plan/sorts/builder.rs:
##########
@@ -89,82 +99,37 @@ impl BatchBuilder {
return Ok(None);
}
- // Mapping from stream index to the index of the first buffer from
that stream
- let mut buffer_idx = 0;
- let mut stream_to_buffer_idx = Vec::with_capacity(self.batches.len());
-
- for batches in &self.batches {
- stream_to_buffer_idx.push(buffer_idx);
- buffer_idx += batches.len();
- }
-
- let columns = self
- .schema
- .fields()
- .iter()
- .enumerate()
- .map(|(column_idx, field)| {
- let arrays = self
+ let columns = (0..self.schema.fields.len())
+ .map(|column_idx| {
+ let arrays: Vec<_> = self
.batches
.iter()
- .flat_map(|batch| {
- batch.iter().map(|batch|
batch.column(column_idx).data())
- })
+ .map(|(_, batch)| batch.column(column_idx).as_ref())
.collect();
-
- let mut array_data = MutableArrayData::new(
- arrays,
- field.is_nullable(),
- self.indices.len(),
- );
-
- let first = &self.indices[0];
- let mut buffer_idx =
- stream_to_buffer_idx[first.stream_idx] + first.batch_idx;
- let mut start_row_idx = first.row_idx;
- let mut end_row_idx = start_row_idx + 1;
-
- for row_index in self.indices.iter().skip(1) {
- let next_buffer_idx =
- stream_to_buffer_idx[row_index.stream_idx] +
row_index.batch_idx;
-
- if next_buffer_idx == buffer_idx && row_index.row_idx ==
end_row_idx {
- // subsequent row in same batch
- end_row_idx += 1;
- continue;
- }
-
- // emit current batch of rows for current buffer
- array_data.extend(buffer_idx, start_row_idx, end_row_idx);
-
- // start new batch of rows
- buffer_idx = next_buffer_idx;
- start_row_idx = row_index.row_idx;
- end_row_idx = start_row_idx + 1;
- }
-
- // emit final batch of rows
- array_data.extend(buffer_idx, start_row_idx, end_row_idx);
- make_array(array_data.freeze())
+ Ok(interleave(&arrays, &self.indices)?)
Review Comment:
👌
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]