alamb opened a new pull request #909:
URL: https://github.com/apache/arrow-datafusion/pull/909


   # Which issue does this PR close?
   
   Built on https://github.com/apache/arrow-datafusion/pull/908 so should 
review that first
   
   Part of  https://github.com/apache/arrow-datafusion/issues/866
   
    # Rationale for this change
   I would like to be able get an overall understanding of where time is being 
spent during query execution via  `EXPLAIN ANALYZE` (see 
https://github.com/apache/arrow-datafusion/pull/858) so that I know where to 
focuse additional performance optimization activities
   
   Additionally, I would like to be able to graph a stacked flamechart such as 
the following  see more details on 
https://github.com/influxdata/influxdb_iox/issues/2273)  that shows when the 
different operators ran in relation to each other.
   
   <img width="689" alt="Screen Shot 2021-08-12 at 11 14 33 AM" 
src="https://user-images.githubusercontent.com/490673/129237447-834838c8-aa97-42c4-b905-6114d28ca98b.png";>
   
   
   # What changes are included in this PR?
   Begin  adding the following data for each operator, as it makes sense
   1. output_rows: total rows produced at the output of the operator
   2. cpu_nanos: the total time spent (not including any time spent in the 
input stream or waiting to be scheduled)
   3. start_time: the wall clock time at which `execute` was run
   4. stop_time: the wall clock time at which the last output record batch was 
produced
   
   # Changes
   1. Adds a `BaselineMetrics` structure that has the common metrics to assist 
in annotating
   2. Adds a `Timestamp` metric type and `StartTimestamp` and `EndTimestamp` 
values and associated aggregating code
   3. Print out metrics in deterministic order
   
   # Are there any user-facing changes?
   Better `ANALYZE` output. 
   
   Using this setup
   
   ```shell
   echo "1,A" > /tmp/foo.csv
   echo "1,B" >> /tmp/foo.csv
   echo "2,A" >> /tmp/foo.csv
   ```
   
   Run CLI
   ```shell
   cargo run --bin datafusion-cli
   ```
   
   And then run this SQL:
   ```SQL
   CREATE EXTERNAL TABLE foo(x INT, b VARCHAR) STORED AS CSV LOCATION 
'/tmp/foo.csv';
   
   SELECT SUM(x) FROM foo GROUP BY b;
   
   EXPLAIN ANALYZE SELECT SUM(x) FROM foo GROUP BY b;
   
   ```
   
   ## Before this PR
   ```
   > EXPLAIN ANALYZE SELECT SUM(x) FROM foo GROUP BY b;
   
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type         | plan                                                   
                                                                                
                   |
   
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
   | Plan with Metrics | CoalescePartitionsExec, metrics=[]                     
                                                                                
                   |
   |                   |   ProjectionExec: expr=[SUM(foo.x)@1 as SUM(x)], 
metrics=[]                                                                      
                         |
   |                   |     HashAggregateExec: mode=FinalPartitioned, gby=[b@0 
as b], aggr=[SUM(x)], metrics=[outputRows=2]                                    
                   |
   |                   |       CoalesceBatchesExec: target_batch_size=4096, 
metrics=[]                                                                      
                       |
   |                   |         RepartitionExec: partitioning=Hash([Column { 
name: "b", index: 0 }], 16), metrics=[sendTime=968487, repartitionTime=5686072, 
fetchTime=110114033] |
   |                   |           HashAggregateExec: mode=Partial, gby=[b@1 as 
b], aggr=[SUM(x)], metrics=[outputRows=2]                                       
                   |
   |                   |             RepartitionExec: 
partitioning=RoundRobinBatch(16), metrics=[sendTime=12090, fetchTime=5106669, 
repartitionTime=0]                             |
   |                   |               CsvExec: source=Path(/tmp/foo.csv: 
[/tmp/foo.csv]), has_header=false, metrics=[]                                   
                         |
   
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   ## After this PR
   (note the presence of `output_rows`,  `cpu_time`, `start_timestamp`, and 
`end_timestamp`)
   
   ```
   ------------------------+
   | plan_type         | plan                                                   
                                                                                
                                                                                
   |
   
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | Plan with Metrics | CoalescePartitionsExec, metrics=[output_rows=2, 
cpu_time=NOT RECORDED, start_timestamp=2021-08-20 17:10:51.810651 UTC, 
end_timestamp=2021-08-20 17:10:51.821645 UTC]                                   
                   |
   |                   |   ProjectionExec: expr=[SUM(foo.x)@1 as SUM(x)], 
metrics=[]                                                                      
                                                                                
         |
   |                   |     HashAggregateExec: mode=FinalPartitioned, gby=[b@0 
as b], aggr=[SUM(x)], metrics=[output_rows=2]                                   
                                                                                
   |
   |                   |       CoalesceBatchesExec: target_batch_size=4096, 
metrics=[]                                                                      
                                                                                
       |
   |                   |         RepartitionExec: partitioning=Hash([Column { 
name: "b", index: 0 }], 16), 
metrics=[fetch_time{inputPartition=2}=138.444053ms, 
repart_time{inputPartition=2}=7.445086ms, send_time{inputPartition=2}=NOT 
RECORDED] |
   |                   |           HashAggregateExec: mode=Partial, gby=[b@1 as 
b], aggr=[SUM(x)], metrics=[output_rows=2]                                      
                                                                                
   |
   |                   |             RepartitionExec: 
partitioning=RoundRobinBatch(16), metrics=[send_time{inputPartition=0}=9.378µs, 
fetch_time{inputPartition=0}=6.57685ms, repart_time{inputPartition=0}=NOT 
RECORDED]                          |
   |                   |               CsvExec: source=Path(/tmp/foo.csv: 
[/tmp/foo.csv]), has_header=false, metrics=[]                                   
                                                                                
         |
   
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   1 row in set. Query took 0.019 seconds.
   ```
   
   # Follow on work:
   1. Annotate additional operators
   2. Add additional wrapper as suggested by @houqp in 
https://github.com/apache/arrow-datafusion/issues/866#issuecomment-898202701 to 
help avoid special code for each operator. The `BaselineMetrics` is a  start in 
this direction but there is still more to go
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to