YjyJeff commented on code in PR #6929: URL: https://github.com/apache/arrow-datafusion/pull/6929#discussion_r1262232085
########## datafusion/core/src/physical_plan/repartition/flume_channels.rs: ########## @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Channel based on flume + +use flume::r#async::RecvStream; +use flume::{unbounded, Sender}; Review Comment: @Dandandan Good point. I have compared the flume channel with the tokio channel on tpch. Here is the result: ``` Comparing main and feature_tokio_unbounded -------------------- Benchmark tpch.json -------------------- ┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ ┃ Query ┃ main ┃ feature_tokio_unbounded ┃ Change ┃ ┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ │ QQuery 1 │ 317.52ms │ 309.05ms │ no change │ │ QQuery 2 │ 73.18ms │ 67.94ms │ +1.08x faster │ │ QQuery 3 │ 136.38ms │ 114.46ms │ +1.19x faster │ │ QQuery 4 │ 84.27ms │ 51.39ms │ +1.64x faster │ │ QQuery 5 │ 170.56ms │ 121.67ms │ +1.40x faster │ │ QQuery 6 │ 83.52ms │ 83.22ms │ no change │ │ QQuery 7 │ 249.60ms │ 220.18ms │ +1.13x faster │ │ QQuery 8 │ 191.66ms │ 175.75ms │ +1.09x faster │ │ QQuery 9 │ 282.38ms │ 215.22ms │ +1.31x faster │ │ QQuery 10 │ 230.92ms │ 152.78ms │ +1.51x faster │ │ QQuery 11 │ 52.68ms │ 56.47ms │ 1.07x slower │ │ QQuery 12 │ 153.50ms │ 120.02ms │ +1.28x faster │ │ QQuery 13 │ 314.86ms │ 309.84ms │ no change │ │ QQuery 14 │ 115.02ms │ 114.93ms │ no change │ │ QQuery 15 │ 90.32ms │ 93.55ms │ no change │ │ QQuery 16 │ 67.44ms │ 62.28ms │ +1.08x faster │ │ QQuery 17 │ 785.40ms │ 763.14ms │ no change │ │ QQuery 18 │ 636.27ms │ 559.21ms │ +1.14x faster │ │ QQuery 19 │ 232.26ms │ 231.57ms │ no change │ │ QQuery 20 │ 261.95ms │ 247.18ms │ +1.06x faster │ │ QQuery 21 │ 351.81ms │ 247.46ms │ +1.42x faster │ │ QQuery 22 │ 54.88ms │ 48.02ms │ +1.14x faster │ └──────────────┴──────────┴─────────────────────────┴───────────────┘ ``` ``` Comparing feature_flume and feature_tokio_unbounded -------------------- Benchmark tpch.json -------------------- ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓ ┃ Query ┃ feature_flume ┃ feature_tokio_unbounded ┃ Change ┃ ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩ │ QQuery 1 │ 317.86ms │ 309.05ms │ no change │ │ QQuery 2 │ 70.41ms │ 67.94ms │ no change │ │ QQuery 3 │ 113.01ms │ 114.46ms │ no change │ │ QQuery 4 │ 51.30ms │ 51.39ms │ no change │ │ QQuery 5 │ 123.28ms │ 121.67ms │ no change │ │ QQuery 6 │ 81.93ms │ 83.22ms │ no change │ │ QQuery 7 │ 220.84ms │ 220.18ms │ no change │ │ QQuery 8 │ 175.73ms │ 175.75ms │ no change │ │ QQuery 9 │ 213.37ms │ 215.22ms │ no change │ │ QQuery 10 │ 153.20ms │ 152.78ms │ no change │ │ QQuery 11 │ 54.10ms │ 56.47ms │ no change │ │ QQuery 12 │ 119.72ms │ 120.02ms │ no change │ │ QQuery 13 │ 313.01ms │ 309.84ms │ no change │ │ QQuery 14 │ 115.82ms │ 114.93ms │ no change │ │ QQuery 15 │ 89.26ms │ 93.55ms │ no change │ │ QQuery 16 │ 61.57ms │ 62.28ms │ no change │ │ QQuery 17 │ 786.18ms │ 763.14ms │ no change │ │ QQuery 18 │ 491.24ms │ 559.21ms │ 1.14x slower │ │ QQuery 19 │ 231.82ms │ 231.57ms │ no change │ │ QQuery 20 │ 240.57ms │ 247.18ms │ no change │ │ QQuery 21 │ 239.96ms │ 247.46ms │ no change │ │ QQuery 22 │ 49.39ms │ 48.02ms │ no change │ └──────────────┴───────────────┴─────────────────────────┴──────────────┘ ``` From the above result, we can see that - changing the custom channel to `tokio::mpsc` can also improve the performance a lot - flume is more efficient in one query To reproduce the result, you could find the [code](https://github.com/YjyJeff/arrow-datafusion/tree/feature/tokio_unbounded) here. In my view, avoiding high memory usage is good. But we should not sacrifice the performance 0.0 ########## datafusion/core/src/physical_plan/repartition/flume_channels.rs: ########## @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Channel based on flume + +use flume::r#async::RecvStream; +use flume::{unbounded, Sender}; Review Comment: @Dandandan Good point. I have compared the flume channel with the tokio channel on tpch. Here is the result: ``` Comparing main and feature_tokio_unbounded -------------------- Benchmark tpch.json -------------------- ┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ ┃ Query ┃ main ┃ feature_tokio_unbounded ┃ Change ┃ ┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ │ QQuery 1 │ 317.52ms │ 309.05ms │ no change │ │ QQuery 2 │ 73.18ms │ 67.94ms │ +1.08x faster │ │ QQuery 3 │ 136.38ms │ 114.46ms │ +1.19x faster │ │ QQuery 4 │ 84.27ms │ 51.39ms │ +1.64x faster │ │ QQuery 5 │ 170.56ms │ 121.67ms │ +1.40x faster │ │ QQuery 6 │ 83.52ms │ 83.22ms │ no change │ │ QQuery 7 │ 249.60ms │ 220.18ms │ +1.13x faster │ │ QQuery 8 │ 191.66ms │ 175.75ms │ +1.09x faster │ │ QQuery 9 │ 282.38ms │ 215.22ms │ +1.31x faster │ │ QQuery 10 │ 230.92ms │ 152.78ms │ +1.51x faster │ │ QQuery 11 │ 52.68ms │ 56.47ms │ 1.07x slower │ │ QQuery 12 │ 153.50ms │ 120.02ms │ +1.28x faster │ │ QQuery 13 │ 314.86ms │ 309.84ms │ no change │ │ QQuery 14 │ 115.02ms │ 114.93ms │ no change │ │ QQuery 15 │ 90.32ms │ 93.55ms │ no change │ │ QQuery 16 │ 67.44ms │ 62.28ms │ +1.08x faster │ │ QQuery 17 │ 785.40ms │ 763.14ms │ no change │ │ QQuery 18 │ 636.27ms │ 559.21ms │ +1.14x faster │ │ QQuery 19 │ 232.26ms │ 231.57ms │ no change │ │ QQuery 20 │ 261.95ms │ 247.18ms │ +1.06x faster │ │ QQuery 21 │ 351.81ms │ 247.46ms │ +1.42x faster │ │ QQuery 22 │ 54.88ms │ 48.02ms │ +1.14x faster │ └──────────────┴──────────┴─────────────────────────┴───────────────┘ ``` ``` Comparing feature_flume and feature_tokio_unbounded -------------------- Benchmark tpch.json -------------------- ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓ ┃ Query ┃ feature_flume ┃ feature_tokio_unbounded ┃ Change ┃ ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩ │ QQuery 1 │ 317.86ms │ 309.05ms │ no change │ │ QQuery 2 │ 70.41ms │ 67.94ms │ no change │ │ QQuery 3 │ 113.01ms │ 114.46ms │ no change │ │ QQuery 4 │ 51.30ms │ 51.39ms │ no change │ │ QQuery 5 │ 123.28ms │ 121.67ms │ no change │ │ QQuery 6 │ 81.93ms │ 83.22ms │ no change │ │ QQuery 7 │ 220.84ms │ 220.18ms │ no change │ │ QQuery 8 │ 175.73ms │ 175.75ms │ no change │ │ QQuery 9 │ 213.37ms │ 215.22ms │ no change │ │ QQuery 10 │ 153.20ms │ 152.78ms │ no change │ │ QQuery 11 │ 54.10ms │ 56.47ms │ no change │ │ QQuery 12 │ 119.72ms │ 120.02ms │ no change │ │ QQuery 13 │ 313.01ms │ 309.84ms │ no change │ │ QQuery 14 │ 115.82ms │ 114.93ms │ no change │ │ QQuery 15 │ 89.26ms │ 93.55ms │ no change │ │ QQuery 16 │ 61.57ms │ 62.28ms │ no change │ │ QQuery 17 │ 786.18ms │ 763.14ms │ no change │ │ QQuery 18 │ 491.24ms │ 559.21ms │ 1.14x slower │ │ QQuery 19 │ 231.82ms │ 231.57ms │ no change │ │ QQuery 20 │ 240.57ms │ 247.18ms │ no change │ │ QQuery 21 │ 239.96ms │ 247.46ms │ no change │ │ QQuery 22 │ 49.39ms │ 48.02ms │ no change │ └──────────────┴───────────────┴─────────────────────────┴──────────────┘ ``` From the above result, we can see that - changing the custom channel to `tokio::mpsc` can also improve the performance a lot - flume is more efficient in one query To reproduce the result, you could find the [code](https://github.com/YjyJeff/arrow-datafusion/tree/feature/tokio_unbounded) here. In my view, avoiding high memory usage is good. But we should not sacrifice the performance 0.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
