Dandandan commented on a change in pull request #1029:
URL: https://github.com/apache/arrow-datafusion/pull/1029#discussion_r712357580
##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -308,8 +310,38 @@ pub fn visit_execution_plan<V: ExecutionPlanVisitor>(
/// Execute the [ExecutionPlan] and collect the results in memory
pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>>
{
- let stream = execute_stream(plan).await?;
- common::collect(stream).await
+ let stream = execute_stream(plan.clone()).await?;
+ let any_plan = plan.as_any().downcast_ref::<UnionExec>();
+ match any_plan {
+ Some(&UnionExec { .. }) => {
+ let record_batches = common::collect(stream).await;
+ if any_plan.unwrap().is_all() {
+ return record_batches;
+ }
+ let mut new_record_batches = Vec::new();
+ let mut vec_str = Vec::new();
+ for record_batch in record_batches.unwrap() {
+ for _row in 0..record_batch.num_rows() {
+ let mut array_str = String::new();
+ let mut vec_array = Vec::new();
+ for col in 0..record_batch.num_columns() {
+ let column = record_batch.column(col);
+ array_str += &*array_value_to_string(column, 1)?;
+ vec_array.push(column.clone());
+ }
+ if vec_str.contains(&array_str) {
Review comment:
As the code is using a `Vec` with all rows + `contains` this also scales
badly, namely `O(n^2)` instead of `O(n)`. Besides that, it probably has
extremely high overhead in terms of string formatting and memory usage to keep
all batches + rows converted to strings in memory.
I think a cleaner / efficient way to go now would be to reuse the current
structure we have to drive the execution.
https://github.com/apache/arrow-datafusion/issues/998#issuecomment-922975719
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]