alamb opened a new pull request #909: URL: https://github.com/apache/arrow-datafusion/pull/909
# Which issue does this PR close? Built on https://github.com/apache/arrow-datafusion/pull/908 so should review that first Part of https://github.com/apache/arrow-datafusion/issues/866 # Rationale for this change I would like to be able get an overall understanding of where time is being spent during query execution via `EXPLAIN ANALYZE` (see https://github.com/apache/arrow-datafusion/pull/858) so that I know where to focuse additional performance optimization activities Additionally, I would like to be able to graph a stacked flamechart such as the following see more details on https://github.com/influxdata/influxdb_iox/issues/2273) that shows when the different operators ran in relation to each other. <img width="689" alt="Screen Shot 2021-08-12 at 11 14 33 AM" src="https://user-images.githubusercontent.com/490673/129237447-834838c8-aa97-42c4-b905-6114d28ca98b.png"> # What changes are included in this PR? Begin adding the following data for each operator, as it makes sense 1. output_rows: total rows produced at the output of the operator 2. cpu_nanos: the total time spent (not including any time spent in the input stream or waiting to be scheduled) 3. start_time: the wall clock time at which `execute` was run 4. stop_time: the wall clock time at which the last output record batch was produced # Changes 1. Adds a `BaselineMetrics` structure that has the common metrics to assist in annotating 2. Adds a `Timestamp` metric type and `StartTimestamp` and `EndTimestamp` values and associated aggregating code 3. Print out metrics in deterministic order # Are there any user-facing changes? Better `ANALYZE` output. Using this setup ```shell echo "1,A" > /tmp/foo.csv echo "1,B" >> /tmp/foo.csv echo "2,A" >> /tmp/foo.csv ``` Run CLI ```shell cargo run --bin datafusion-cli ``` And then run this SQL: ```SQL CREATE EXTERNAL TABLE foo(x INT, b VARCHAR) STORED AS CSV LOCATION '/tmp/foo.csv'; SELECT SUM(x) FROM foo GROUP BY b; EXPLAIN ANALYZE SELECT SUM(x) FROM foo GROUP BY b; ``` ## Before this PR ``` > EXPLAIN ANALYZE SELECT SUM(x) FROM foo GROUP BY b; +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ | Plan with Metrics | CoalescePartitionsExec, metrics=[] | | | ProjectionExec: expr=[SUM(foo.x)@1 as SUM(x)], metrics=[] | | | HashAggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[SUM(x)], metrics=[outputRows=2] | | | CoalesceBatchesExec: target_batch_size=4096, metrics=[] | | | RepartitionExec: partitioning=Hash([Column { name: "b", index: 0 }], 16), metrics=[sendTime=968487, repartitionTime=5686072, fetchTime=110114033] | | | HashAggregateExec: mode=Partial, gby=[b@1 as b], aggr=[SUM(x)], metrics=[outputRows=2] | | | RepartitionExec: partitioning=RoundRobinBatch(16), metrics=[sendTime=12090, fetchTime=5106669, repartitionTime=0] | | | CsvExec: source=Path(/tmp/foo.csv: [/tmp/foo.csv]), has_header=false, metrics=[] | +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+ ``` ## After this PR (note the presence of `output_rows`, `cpu_time`, `start_timestamp`, and `end_timestamp`) ``` ------------------------+ | plan_type | plan | +-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Plan with Metrics | CoalescePartitionsExec, metrics=[output_rows=2, cpu_time=NOT RECORDED, start_timestamp=2021-08-20 17:10:51.810651 UTC, end_timestamp=2021-08-20 17:10:51.821645 UTC] | | | ProjectionExec: expr=[SUM(foo.x)@1 as SUM(x)], metrics=[] | | | HashAggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[SUM(x)], metrics=[output_rows=2] | | | CoalesceBatchesExec: target_batch_size=4096, metrics=[] | | | RepartitionExec: partitioning=Hash([Column { name: "b", index: 0 }], 16), metrics=[fetch_time{inputPartition=2}=138.444053ms, repart_time{inputPartition=2}=7.445086ms, send_time{inputPartition=2}=NOT RECORDED] | | | HashAggregateExec: mode=Partial, gby=[b@1 as b], aggr=[SUM(x)], metrics=[output_rows=2] | | | RepartitionExec: partitioning=RoundRobinBatch(16), metrics=[send_time{inputPartition=0}=9.378µs, fetch_time{inputPartition=0}=6.57685ms, repart_time{inputPartition=0}=NOT RECORDED] | | | CsvExec: source=Path(/tmp/foo.csv: [/tmp/foo.csv]), has_header=false, metrics=[] | +-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row in set. Query took 0.019 seconds. ``` # Follow on work: 1. Annotate additional operators 2. Add additional wrapper as suggested by @houqp in https://github.com/apache/arrow-datafusion/issues/866#issuecomment-898202701 to help avoid special code for each operator. The `BaselineMetrics` is a start in this direction but there is still more to go -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
