alamb commented on a change in pull request #1029:
URL: https://github.com/apache/arrow-datafusion/pull/1029#discussion_r712406143
##########
File path: datafusion/src/physical_plan/mod.rs
##########
@@ -308,8 +310,38 @@ pub fn visit_execution_plan<V: ExecutionPlanVisitor>(
/// Execute the [ExecutionPlan] and collect the results in memory
pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>>
{
- let stream = execute_stream(plan).await?;
- common::collect(stream).await
+ let stream = execute_stream(plan.clone()).await?;
+ let any_plan = plan.as_any().downcast_ref::<UnionExec>();
+ match any_plan {
+ Some(&UnionExec { .. }) => {
+ let record_batches = common::collect(stream).await;
+ if any_plan.unwrap().is_all() {
+ return record_batches;
+ }
+ let mut new_record_batches = Vec::new();
+ let mut vec_str = Vec::new();
+ for record_batch in record_batches.unwrap() {
+ for _row in 0..record_batch.num_rows() {
+ let mut array_str = String::new();
+ let mut vec_array = Vec::new();
+ for col in 0..record_batch.num_columns() {
+ let column = record_batch.column(col);
+ array_str += &*array_value_to_string(column, 1)?;
+ vec_array.push(column.clone());
+ }
+ if vec_str.contains(&array_str) {
Review comment:
I agree with @Dandandan -- specifically I think you could make a plan
that implemented `SELECT x from foo UNION select x from bar` by effectively
creating the same plan as
`SELECT distinct (select x from foo UNION ALL select x from bar)`
You can see the plan that gets made by running `EXPLAIN ANALYZE`:
```
explain select distinct x from ( select 1 as x UNION ALL select 1 as x);
+---------------+------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[#x]], aggr=[[]]
|
| | Union
|
| | Projection: Int64(1) AS x
|
| | EmptyRelation
|
| | Projection: Int64(1) AS x
|
| | EmptyRelation
|
| physical_plan | HashAggregateExec: mode=FinalPartitioned, gby=[x@0 as x],
aggr=[] |
| | CoalesceBatchesExec: target_batch_size=4096
|
| | RepartitionExec: partitioning=Hash([Column { name:
"x", index: 0 }], 16) |
| | HashAggregateExec: mode=Partial, gby=[x@0 as x],
aggr=[] |
| | UnionExec
|
| | RepartitionExec:
partitioning=RoundRobinBatch(16) |
| | ProjectionExec: expr=[1 as x]
|
| | EmptyExec: produce_one_row=true
|
| | RepartitionExec:
partitioning=RoundRobinBatch(16) |
| | ProjectionExec: expr=[1 as x]
|
| | EmptyExec: produce_one_row=true
|
+---------------+------------------------------------------------------------------------------+```
(so use a `UnionExec` followed by `HashAggregateExec`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]