This is an automated email from the ASF dual-hosted git repository.

jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 12c4c862f8 feat: Use `SchemaRef` in `JoinFilter` (#14182)
12c4c862f8 is described below

commit 12c4c862f805a324beea0e24417d8c565f462ed4
Author: irenjj <[email protected]>
AuthorDate: Mon Jan 20 09:07:00 2025 +0800

    feat: Use `SchemaRef` in `JoinFilter` (#14182)
    
    * feat: Use `SchemaRef` in `JoinFilter`
    
    * Update datafusion/core/src/physical_optimizer/projection_pushdown.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Update datafusion/physical-plan/src/joins/join_filter.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Update datafusion/physical-plan/src/joins/join_filter.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Update datafusion/physical-plan/src/joins/join_filter.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * fix
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../src/physical_optimizer/projection_pushdown.rs  | 16 ++++-----
 datafusion/core/src/physical_planner.rs            |  2 +-
 datafusion/core/tests/fuzz_cases/join_fuzz.rs      |  7 ++--
 .../physical-optimizer/src/join_selection.rs       |  2 +-
 datafusion/physical-plan/src/joins/hash_join.rs    | 42 +++++++++++++++-------
 datafusion/physical-plan/src/joins/join_filter.rs  | 10 +++---
 .../physical-plan/src/joins/nested_loop_join.rs    | 12 +++++--
 .../physical-plan/src/joins/sort_merge_join.rs     | 10 +++---
 .../physical-plan/src/joins/stream_join_utils.rs   |  9 +++--
 .../physical-plan/src/joins/symmetric_hash_join.rs | 36 ++++++++++++-------
 datafusion/proto/src/physical_plan/mod.rs          | 12 +++----
 11 files changed, 98 insertions(+), 60 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs 
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index 2f1cf3a8e6..cf8d5e352e 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -1255,7 +1255,7 @@ fn update_join_filter(
                     side: col_idx.side,
                 })
                 .collect(),
-            join_filter.schema().clone(),
+            Arc::clone(join_filter.schema()),
         )
     })
 }
@@ -2246,11 +2246,11 @@ mod tests {
                         side: JoinSide::Left,
                     },
                 ],
-                Schema::new(vec![
+                Arc::new(Schema::new(vec![
                     Field::new("b_left_inter", DataType::Int32, true),
                     Field::new("a_right_inter", DataType::Int32, true),
                     Field::new("c_left_inter", DataType::Int32, true),
-                ]),
+                ])),
             )),
             &JoinType::Inner,
             true,
@@ -2360,11 +2360,11 @@ mod tests {
                         side: JoinSide::Left,
                     },
                 ],
-                Schema::new(vec![
+                Arc::new(Schema::new(vec![
                     Field::new("b_left_inter", DataType::Int32, true),
                     Field::new("a_right_inter", DataType::Int32, true),
                     Field::new("c_left_inter", DataType::Int32, true),
-                ]),
+                ])),
             )),
             &JoinType::Inner,
             true,
@@ -2462,7 +2462,7 @@ mod tests {
             Some(JoinFilter::new(
                 filter_expr,
                 filter_column_indices,
-                filter_schema,
+                Arc::new(filter_schema),
             )),
             &JoinType::Inner,
             None,
@@ -2536,11 +2536,11 @@ mod tests {
                         side: JoinSide::Left,
                     },
                 ],
-                Schema::new(vec![
+                Arc::new(Schema::new(vec![
                     Field::new("b_left_inter", DataType::Int32, true),
                     Field::new("a_right_inter", DataType::Int32, true),
                     Field::new("c_left_inter", DataType::Int32, true),
-                ]),
+                ])),
             )),
             &JoinType::Inner,
             None,
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 77bf4bb71d..03ba5688de 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1090,7 +1090,7 @@ impl DefaultPhysicalPlanner {
                         Some(join_utils::JoinFilter::new(
                             filter_expr,
                             column_indices,
-                            filter_schema,
+                            Arc::new(filter_schema),
                         ))
                     }
                     _ => None,
diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
index 04eb48e41c..41c12193f0 100644
--- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
@@ -86,7 +86,7 @@ fn col_lt_col_filter(schema1: Arc<Schema>, schema2: 
Arc<Schema>) -> JoinFilter {
             .with_nullable(true),
     ]);
 
-    JoinFilter::new(less_filter, column_indices, intermediate_schema)
+    JoinFilter::new(less_filter, column_indices, Arc::new(intermediate_schema))
 }
 
 #[tokio::test]
@@ -327,7 +327,7 @@ impl JoinFuzzTestCase {
     /// on-condition schema
     fn intermediate_schema(&self) -> Schema {
         let filter_schema = if let Some(filter) = self.join_filter() {
-            filter.schema().to_owned()
+            filter.schema().as_ref().to_owned()
         } else {
             Schema::empty()
         };
@@ -483,7 +483,8 @@ impl JoinFuzzTestCase {
         let intermediate_schema = self.intermediate_schema();
         let expression = self.composite_filter_expression();
 
-        let filter = JoinFilter::new(expression, column_indices, 
intermediate_schema);
+        let filter =
+            JoinFilter::new(expression, column_indices, 
Arc::new(intermediate_schema));
 
         Arc::new(
             NestedLoopJoinExec::try_new(left, right, Some(filter), 
&self.join_type, None)
diff --git a/datafusion/physical-optimizer/src/join_selection.rs 
b/datafusion/physical-optimizer/src/join_selection.rs
index 5f7f1f396a..d5f70938a7 100644
--- a/datafusion/physical-optimizer/src/join_selection.rs
+++ b/datafusion/physical-optimizer/src/join_selection.rs
@@ -716,7 +716,7 @@ mod tests_statistical {
         Some(JoinFilter::new(
             expression,
             column_indices,
-            intermediate_schema,
+            Arc::new(intermediate_schema),
         ))
     }
 
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs 
b/datafusion/physical-plan/src/joins/hash_join.rs
index 8ccefb0397..d130b42a9d 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -2576,7 +2576,7 @@ mod tests {
         let filter = JoinFilter::new(
             filter_expression,
             column_indices.clone(),
-            intermediate_schema.clone(),
+            Arc::new(intermediate_schema.clone()),
         );
 
         let join = join_with_filter(
@@ -2611,8 +2611,11 @@ mod tests {
             Operator::Gt,
             Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
         )) as Arc<dyn PhysicalExpr>;
-        let filter =
-            JoinFilter::new(filter_expression, column_indices, 
intermediate_schema);
+        let filter = JoinFilter::new(
+            filter_expression,
+            column_indices,
+            Arc::new(intermediate_schema),
+        );
 
         let join = join_with_filter(left, right, on, filter, 
&JoinType::LeftSemi, false)?;
 
@@ -2700,7 +2703,7 @@ mod tests {
         let filter = JoinFilter::new(
             filter_expression,
             column_indices.clone(),
-            intermediate_schema.clone(),
+            Arc::new(intermediate_schema.clone()),
         );
 
         let join = join_with_filter(
@@ -2738,8 +2741,11 @@ mod tests {
             Arc::new(Literal::new(ScalarValue::Int32(Some(11)))),
         )) as Arc<dyn PhysicalExpr>;
 
-        let filter =
-            JoinFilter::new(filter_expression, column_indices, 
intermediate_schema);
+        let filter = JoinFilter::new(
+            filter_expression,
+            column_indices,
+            Arc::new(intermediate_schema.clone()),
+        );
 
         let join =
             join_with_filter(left, right, on, filter, &JoinType::RightSemi, 
false)?;
@@ -2822,7 +2828,7 @@ mod tests {
         let filter = JoinFilter::new(
             filter_expression,
             column_indices.clone(),
-            intermediate_schema.clone(),
+            Arc::new(intermediate_schema.clone()),
         );
 
         let join = join_with_filter(
@@ -2861,8 +2867,11 @@ mod tests {
             Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
         )) as Arc<dyn PhysicalExpr>;
 
-        let filter =
-            JoinFilter::new(filter_expression, column_indices, 
intermediate_schema);
+        let filter = JoinFilter::new(
+            filter_expression,
+            column_indices,
+            Arc::new(intermediate_schema),
+        );
 
         let join = join_with_filter(left, right, on, filter, 
&JoinType::LeftAnti, false)?;
 
@@ -2951,7 +2960,7 @@ mod tests {
         let filter = JoinFilter::new(
             filter_expression,
             column_indices,
-            intermediate_schema.clone(),
+            Arc::new(intermediate_schema.clone()),
         );
 
         let join = join_with_filter(
@@ -2995,8 +3004,11 @@ mod tests {
             Arc::new(Literal::new(ScalarValue::Int32(Some(8)))),
         )) as Arc<dyn PhysicalExpr>;
 
-        let filter =
-            JoinFilter::new(filter_expression, column_indices, 
intermediate_schema);
+        let filter = JoinFilter::new(
+            filter_expression,
+            column_indices,
+            Arc::new(intermediate_schema),
+        );
 
         let join =
             join_with_filter(left, right, on, filter, &JoinType::RightAnti, 
false)?;
@@ -3359,7 +3371,11 @@ mod tests {
             Arc::new(Column::new("c", 1)),
         )) as Arc<dyn PhysicalExpr>;
 
-        JoinFilter::new(filter_expression, column_indices, intermediate_schema)
+        JoinFilter::new(
+            filter_expression,
+            column_indices,
+            Arc::new(intermediate_schema),
+        )
     }
 
     #[apply(batch_sizes)]
diff --git a/datafusion/physical-plan/src/joins/join_filter.rs 
b/datafusion/physical-plan/src/joins/join_filter.rs
index b99afd87c9..cfc7ad2c10 100644
--- a/datafusion/physical-plan/src/joins/join_filter.rs
+++ b/datafusion/physical-plan/src/joins/join_filter.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::joins::utils::ColumnIndex;
-use arrow_schema::Schema;
+use arrow_schema::SchemaRef;
 use datafusion_common::JoinSide;
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 use std::sync::Arc;
@@ -30,7 +30,7 @@ pub struct JoinFilter {
     /// Column indices required to construct intermediate batch for filtering
     pub(crate) column_indices: Vec<ColumnIndex>,
     /// Physical schema of intermediate batch
-    pub(crate) schema: Schema,
+    pub(crate) schema: SchemaRef,
 }
 
 impl JoinFilter {
@@ -38,7 +38,7 @@ impl JoinFilter {
     pub fn new(
         expression: Arc<dyn PhysicalExpr>,
         column_indices: Vec<ColumnIndex>,
-        schema: Schema,
+        schema: SchemaRef,
     ) -> JoinFilter {
         JoinFilter {
             expression,
@@ -76,7 +76,7 @@ impl JoinFilter {
     }
 
     /// Intermediate batch schema
-    pub fn schema(&self) -> &Schema {
+    pub fn schema(&self) -> &SchemaRef {
         &self.schema
     }
 
@@ -94,7 +94,7 @@ impl JoinFilter {
         JoinFilter::new(
             Arc::clone(self.expression()),
             column_indices,
-            self.schema().clone(),
+            Arc::clone(self.schema()),
         )
     }
 }
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs 
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index f416694e38..d2fa2fdc7b 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -1104,7 +1104,11 @@ pub(crate) mod tests {
             Arc::new(BinaryExpr::new(left_filter, Operator::And, right_filter))
                 as Arc<dyn PhysicalExpr>;
 
-        JoinFilter::new(filter_expression, column_indices, intermediate_schema)
+        JoinFilter::new(
+            filter_expression,
+            column_indices,
+            Arc::new(intermediate_schema),
+        )
     }
 
     pub(crate) async fn multi_partitioned_join_collect(
@@ -1514,7 +1518,11 @@ pub(crate) mod tests {
             Arc::new(BinaryExpr::new(left_filter, Operator::And, right_filter))
                 as Arc<dyn PhysicalExpr>;
 
-        JoinFilter::new(filter_expression, column_indices, intermediate_schema)
+        JoinFilter::new(
+            filter_expression,
+            column_indices,
+            Arc::new(intermediate_schema),
+        )
     }
 
     fn generate_columns(num_columns: usize, num_rows: usize) -> Vec<Vec<i32>> {
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index bcacc7dcae..650d4c1de1 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -1759,10 +1759,8 @@ impl SortMergeJoinStream {
             if !filter_columns.is_empty() {
                 if let Some(f) = &self.filter {
                     // Construct batch with only filter columns
-                    let filter_batch = RecordBatch::try_new(
-                        Arc::new(f.schema().clone()),
-                        filter_columns,
-                    )?;
+                    let filter_batch =
+                        RecordBatch::try_new(Arc::clone(f.schema()), 
filter_columns)?;
 
                     let filter_result = f
                         .expression()
@@ -3182,10 +3180,10 @@ mod tests {
                     side: JoinSide::Right,
                 },
             ],
-            Schema::new(vec![
+            Arc::new(Schema::new(vec![
                 Field::new("c1", DataType::Int32, true),
                 Field::new("c2", DataType::Int32, true),
-            ]),
+            ])),
         );
         let (_, batches) =
             join_collect_with_filter(left, right, on, filter, 
RightAnti).await?;
diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs 
b/datafusion/physical-plan/src/joins/stream_join_utils.rs
index cea04ccad3..6d4f06b3ae 100644
--- a/datafusion/physical-plan/src/joins/stream_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs
@@ -856,7 +856,8 @@ pub mod tests {
                 side: JoinSide::Right,
             },
         ];
-        let filter = JoinFilter::new(filter_expr, column_indices, 
intermediate_schema);
+        let filter =
+            JoinFilter::new(filter_expr, column_indices, 
Arc::new(intermediate_schema));
 
         let left_sort_filter_expr = build_filter_input_order(
             JoinSide::Left,
@@ -983,7 +984,8 @@ pub mod tests {
                 side: JoinSide::Right,
             },
         ];
-        let filter = JoinFilter::new(filter_expr, column_indices, 
intermediate_schema);
+        let filter =
+            JoinFilter::new(filter_expr, column_indices, 
Arc::new(intermediate_schema));
 
         let left_schema = Arc::new(left_schema);
         let right_schema = Arc::new(right_schema);
@@ -1055,7 +1057,8 @@ pub mod tests {
                 side: JoinSide::Left,
             },
         ];
-        let filter = JoinFilter::new(filter_expr, column_indices, 
intermediate_schema);
+        let filter =
+            JoinFilter::new(filter_expr, column_indices, 
Arc::new(intermediate_schema));
 
         let schema = Schema::new(vec![
             Field::new("a", DataType::Int32, false),
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs 
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index 72fd5a0feb..b050d3adfe 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -1799,7 +1799,8 @@ mod tests {
                 side: JoinSide::Right,
             },
         ];
-        let filter = JoinFilter::new(filter_expr, column_indices, 
intermediate_schema);
+        let filter =
+            JoinFilter::new(filter_expr, column_indices, 
Arc::new(intermediate_schema));
 
         experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
         Ok(())
@@ -1864,7 +1865,8 @@ mod tests {
                 side: JoinSide::Right,
             },
         ];
-        let filter = JoinFilter::new(filter_expr, column_indices, 
intermediate_schema);
+        let filter =
+            JoinFilter::new(filter_expr, column_indices, 
Arc::new(intermediate_schema));
 
         experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
         Ok(())
@@ -1916,7 +1918,8 @@ mod tests {
                 side: JoinSide::Right,
             },
         ];
-        let filter = JoinFilter::new(filter_expr, column_indices, 
intermediate_schema);
+        let filter =
+            JoinFilter::new(filter_expr, column_indices, 
Arc::new(intermediate_schema));
 
         experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
         Ok(())
@@ -2013,7 +2016,8 @@ mod tests {
                 side: JoinSide::Right,
             },
         ];
-        let filter = JoinFilter::new(filter_expr, column_indices, 
intermediate_schema);
+        let filter =
+            JoinFilter::new(filter_expr, column_indices, 
Arc::new(intermediate_schema));
 
         experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
         Ok(())
@@ -2071,7 +2075,8 @@ mod tests {
                 side: JoinSide::Right,
             },
         ];
-        let filter = JoinFilter::new(filter_expr, column_indices, 
intermediate_schema);
+        let filter =
+            JoinFilter::new(filter_expr, column_indices, 
Arc::new(intermediate_schema));
         experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
         Ok(())
     }
@@ -2129,7 +2134,8 @@ mod tests {
                 side: JoinSide::Right,
             },
         ];
-        let filter = JoinFilter::new(filter_expr, column_indices, 
intermediate_schema);
+        let filter =
+            JoinFilter::new(filter_expr, column_indices, 
Arc::new(intermediate_schema));
 
         experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
         Ok(())
@@ -2189,7 +2195,8 @@ mod tests {
                 side: JoinSide::Right,
             },
         ];
-        let filter = JoinFilter::new(filter_expr, column_indices, 
intermediate_schema);
+        let filter =
+            JoinFilter::new(filter_expr, column_indices, 
Arc::new(intermediate_schema));
 
         experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
         Ok(())
@@ -2246,7 +2253,8 @@ mod tests {
                 side: JoinSide::Right,
             },
         ];
-        let filter = JoinFilter::new(filter_expr, column_indices, 
intermediate_schema);
+        let filter =
+            JoinFilter::new(filter_expr, column_indices, 
Arc::new(intermediate_schema));
 
         experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
         Ok(())
@@ -2310,7 +2318,8 @@ mod tests {
                 side: JoinSide::Right,
             },
         ];
-        let filter = JoinFilter::new(filter_expr, column_indices, 
intermediate_schema);
+        let filter =
+            JoinFilter::new(filter_expr, column_indices, 
Arc::new(intermediate_schema));
 
         experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
         Ok(())
@@ -2394,7 +2403,8 @@ mod tests {
                 side: JoinSide::Right,
             },
         ];
-        let filter = JoinFilter::new(filter_expr, column_indices, 
intermediate_schema);
+        let filter =
+            JoinFilter::new(filter_expr, column_indices, 
Arc::new(intermediate_schema));
         experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
         Ok(())
     }
@@ -2468,7 +2478,8 @@ mod tests {
                 side: JoinSide::Right,
             },
         ];
-        let filter = JoinFilter::new(filter_expr, column_indices, 
intermediate_schema);
+        let filter =
+            JoinFilter::new(filter_expr, column_indices, 
Arc::new(intermediate_schema));
         experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
 
         Ok(())
@@ -2539,7 +2550,8 @@ mod tests {
                 side: JoinSide::Right,
             },
         ];
-        let filter = JoinFilter::new(filter_expr, column_indices, 
intermediate_schema);
+        let filter =
+            JoinFilter::new(filter_expr, column_indices, 
Arc::new(intermediate_schema));
 
         experiment(left, right, Some(filter), join_type, on, task_ctx).await?;
         Ok(())
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index 25fca611df..a266d55b46 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -625,7 +625,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                             })
                             .collect::<Result<Vec<_>>>()?;
 
-                        Ok(JoinFilter::new(expression, column_indices, schema))
+                        Ok(JoinFilter::new(expression, column_indices, 
Arc::new(schema)))
                     })
                     .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
 
@@ -739,7 +739,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                             })
                             .collect::<Result<_>>()?;
 
-                        Ok(JoinFilter::new(expression, column_indices, schema))
+                        Ok(JoinFilter::new(expression, column_indices, 
Arc::new(schema)))
                     })
                     .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
 
@@ -992,7 +992,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                             })
                             .collect::<Result<Vec<_>>>()?;
 
-                        Ok(JoinFilter::new(expression, column_indices, schema))
+                        Ok(JoinFilter::new(expression, column_indices, 
Arc::new(schema)))
                     })
                     .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
 
@@ -1316,7 +1316,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                             }
                         })
                         .collect();
-                    let schema = f.schema().try_into()?;
+                    let schema = f.schema().as_ref().try_into()?;
                     Ok(protobuf::JoinFilter {
                         expression: Some(expression),
                         column_indices,
@@ -1388,7 +1388,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                             }
                         })
                         .collect();
-                    let schema = f.schema().try_into()?;
+                    let schema = f.schema().as_ref().try_into()?;
                     Ok(protobuf::JoinFilter {
                         expression: Some(expression),
                         column_indices,
@@ -1833,7 +1833,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                             }
                         })
                         .collect();
-                    let schema = f.schema().try_into()?;
+                    let schema = f.schema().as_ref().try_into()?;
                     Ok(protobuf::JoinFilter {
                         expression: Some(expression),
                         column_indices,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to