jackwener commented on code in PR #4219:
URL: https://github.com/apache/arrow-datafusion/pull/4219#discussion_r1027263477


##########
datafusion/core/src/physical_optimizer/join_selection.rs:
##########
@@ -0,0 +1,972 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilizing exact statistics from sources to avoid scanning data
+use std::sync::Arc;
+
+use arrow::datatypes::Schema;
+
+use crate::execution::context::SessionConfig;
+use crate::logical_expr::JoinType;
+use crate::physical_plan::expressions::Column;
+use crate::physical_plan::joins::{
+    utils::{ColumnIndex, JoinFilter, JoinSide},
+    CrossJoinExec, HashJoinExec, PartitionMode,
+};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::{ExecutionPlan, PhysicalExpr};
+
+use super::optimizer::PhysicalOptimizerRule;
+use crate::error::Result;
+use crate::physical_plan::rewrite::TreeNodeRewritable;
+
+/// For hash join with the partition mode [PartitionMode::Auto], JoinSelection 
rule will make
+/// a cost based decision to select which PartitionMode 
mode(Partitioned/CollectLeft) is optimal
+/// based on the available statistics that the inputs have.
+/// If the statistics information is not available, the partition mode will 
fall back to [PartitionMode::Partitioned].
+///
+/// JoinSelection rule will also reorder the build and probe phase of the hash 
joins
+/// based on the avaliable statistics that the inputs have.
+/// The rule optimizes the order such that the left (build) side of the join 
is the smallest.
+/// If the statistics information is not available, the order stays the same 
as the original query.
+/// JoinSelection rule will also swap the left and right sides for cross join 
to keep the left side
+/// is the smallest.
+#[derive(Default)]
+pub struct JoinSelection {}
+
+impl JoinSelection {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+// TODO we need some performance test for Right Semi/Right Join swap to Left 
Semi/Left Join in case that the right side is smaller but not much smaller.
+// TODO In PrestoSQL, the optimizer flips join sides only if one side is much 
smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default 
is is 8 times.
+fn should_swap_join_order(left: &dyn ExecutionPlan, right: &dyn ExecutionPlan) 
-> bool {
+    // Get the left and right table's total bytes
+    // If both the left and right tables contain total_byte_size statistics,
+    // use `total_byte_size` to determine `should_swap_join_order`, else use 
`num_rows`
+    let (left_size, right_size) = match (
+        left.statistics().total_byte_size,
+        right.statistics().total_byte_size,
+    ) {
+        (Some(l), Some(r)) => (Some(l), Some(r)),
+        _ => (left.statistics().num_rows, right.statistics().num_rows),
+    };
+
+    match (left_size, right_size) {
+        (Some(l), Some(r)) => l > r,
+        _ => false,
+    }
+}
+
+fn supports_collect_by_size(
+    plan: &dyn ExecutionPlan,
+    collection_size_threshold: usize,
+) -> bool {
+    // Currently we do not trust the 0 value from stats, due to stats 
collection might have bug
+    // TODO check the logic in datasource::get_statistics_with_limit()
+    if let Some(size) = plan.statistics().total_byte_size {
+        size != 0 && size < collection_size_threshold
+    } else if let Some(row_count) = plan.statistics().num_rows {
+        row_count != 0 && row_count < collection_size_threshold
+    } else {
+        false
+    }
+}
+
+fn supports_swap(join_type: JoinType) -> bool {
+    match join_type {
+        JoinType::Inner
+        | JoinType::Left
+        | JoinType::Right
+        | JoinType::Full
+        | JoinType::LeftSemi
+        | JoinType::RightSemi
+        | JoinType::LeftAnti
+        | JoinType::RightAnti => true,
+    }
+}
+
+fn swap_join_type(join_type: JoinType) -> JoinType {
+    match join_type {
+        JoinType::Inner => JoinType::Inner,
+        JoinType::Full => JoinType::Full,
+        JoinType::Left => JoinType::Right,
+        JoinType::Right => JoinType::Left,
+        JoinType::LeftSemi => JoinType::RightSemi,
+        JoinType::RightSemi => JoinType::LeftSemi,
+        JoinType::LeftAnti => JoinType::RightAnti,
+        JoinType::RightAnti => JoinType::LeftAnti,
+    }
+}
+
+fn swap_hash_join(
+    hash_join: &HashJoinExec,
+    partition_mode: PartitionMode,
+    left: &Arc<dyn ExecutionPlan>,
+    right: &Arc<dyn ExecutionPlan>,
+) -> Result<Arc<dyn ExecutionPlan>> {
+    let new_join = HashJoinExec::try_new(
+        Arc::clone(right),
+        Arc::clone(left),
+        hash_join
+            .on()
+            .iter()
+            .map(|(l, r)| (r.clone(), l.clone()))
+            .collect(),
+        swap_join_filter(hash_join.filter()),
+        &swap_join_type(*hash_join.join_type()),
+        partition_mode,
+        hash_join.null_equals_null(),
+    )?;
+    if matches!(
+        hash_join.join_type(),
+        JoinType::LeftSemi
+            | JoinType::RightSemi
+            | JoinType::LeftAnti
+            | JoinType::RightAnti
+    ) {
+        Ok(Arc::new(new_join))
+    } else {
+        // TODO avoid adding ProjectionExec again and again, only adding Final 
Projection

Review Comment:
   very careful👍! I also meet this problem in Doris.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to