alamb commented on code in PR #7261:
URL: https://github.com/apache/arrow-datafusion/pull/7261#discussion_r1290549127
##########
datafusion/core/src/physical_plan/sorts/stream.rs:
##########
@@ -108,26 +109,47 @@ impl RowCursorStream {
let converter = RowConverter::new(sort_fields)?;
Ok(Self {
converter,
- reservation,
- column_expressions: expressions.iter().map(|x|
x.expr.clone()).collect(),
+ expressions: expressions.to_vec(),
streams: FusedStreams(streams),
+ reservation,
+ schema: schema.clone(),
})
}
fn convert_batch(&mut self, batch: &RecordBatch) -> Result<RowCursor> {
- let cols = self
- .column_expressions
+ let column_expressions: Vec<_> = self.expressions.iter().map(|x|
x.expr.clone()).collect();
+ let cols = column_expressions
.iter()
.map(|expr| Ok(expr.evaluate(batch)?.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()?;
- let rows = self.converter.convert_columns(&cols)?;
- self.reservation.try_resize(self.converter.size())?;
+ let sort_fields = self
+ .expressions
+ .iter()
+ .map(|expr| {
+ let data_type = expr.expr.data_type(&self.schema)?;
+ Ok(SortField::new_with_options(data_type, expr.options))
+ })
+ .collect::<Result<Vec<_>>>()?;
- // track the memory in the newly created Rows.
+ let old_converter: &mut RowConverter = &mut self.converter;
+ let mut old_rows = old_converter.convert_columns(&cols)?;
+ self.reservation.try_resize(old_converter.size())?;
let mut rows_reservation = self.reservation.new_empty();
- rows_reservation.try_grow(rows.size())?;
- Ok(RowCursor::new(rows, rows_reservation))
+ rows_reservation.try_grow(old_rows.size())?;
+
+ println!("Old converter size: {0}", old_converter.size());
+ if old_converter.size() > 50*1024*1024 {
+ let mut new_converter = RowConverter::new(sort_fields)?;
+ let new_rows = new_converter.convert_columns(
+ &old_converter.convert_rows(&old_rows)?
+ )?;
+ rows_reservation.try_resize(new_rows.size())?;
+ old_rows = new_rows;
+ println!("Swapped old converter of size: {0} with new converter of
size {1}", old_converter.size(), new_converter.size());
+ self.converter = new_converter;
+ }
+ Ok(RowCursor::new(old_rows, rows_reservation))
Review Comment:
I believe you will also have to arrange to rewrite all the outstanding
existing `Rows` in other streams too somehow
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]