Dandandan commented on a change in pull request #9038:
URL: https://github.com/apache/arrow/pull/9038#discussion_r549551740



##########
File path: rust/datafusion/src/physical_plan/expressions.rs
##########
@@ -2414,6 +2417,129 @@ impl PhysicalSortExpr {
     }
 }
 
+/// Not expression
+#[derive(Debug)]
+pub struct InListExpr {
+    expr: Arc<dyn PhysicalExpr>,
+    list: Vec<Arc<dyn PhysicalExpr>>,
+}
+
+impl InListExpr {
+    /// Create a new InList function
+    pub fn new(expr: Arc<dyn PhysicalExpr>, list: Vec<Arc<dyn PhysicalExpr>>) 
-> Self {
+        Self { expr, list }
+    }
+
+    fn compare_utf8(
+        &self,
+        array: Arc<dyn Array>,
+        list_values: Vec<ColumnarValue>,
+    ) -> Result<ColumnarValue> {
+        let array = array
+            .as_any()
+            .downcast_ref::<GenericStringArray<i32>>()
+            .unwrap();
+
+        // create the ListArray expected by comparison::contains_utf8
+        let values_builder = StringBuilder::new(list_values.len());
+        let mut builder = ListBuilder::new(values_builder);
+        for _ in 0..array.len() {
+            list_values.iter().for_each(|expr| match expr {
+                ColumnarValue::Scalar(s) => match s {
+                    ScalarValue::Utf8(Some(v)) => {
+                        builder.values().append_value(v).unwrap();
+                    }
+                    _ => unimplemented!("not yet implemented"),
+                },
+                ColumnarValue::Array(_) => {
+                    unimplemented!("InList should not receive Array")
+                }
+            });
+            builder.append(true).unwrap();
+        }
+        let list_array = builder.finish();
+
+        Ok(ColumnarValue::Array(Arc::new(
+            kernels::comparison::contains_utf8(array, &list_array)
+                .map_err(DataFusionError::ArrowError)?,
+        )))
+    }
+}
+
+impl fmt::Display for InListExpr {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "{} IN ({:?})", self.expr, self.list)
+    }
+}
+
+impl PhysicalExpr for InListExpr {
+    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
+        Ok(DataType::Boolean)
+    }
+
+    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
+        self.expr.nullable(input_schema)
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
+        let value = self.expr.evaluate(batch)?;
+        let value_data_type = value.data_type();
+
+        let list_values = self
+            .list
+            .iter()
+            .map(|expr| expr.evaluate(batch))
+            .collect::<Result<Vec<_>>>()?;
+        let list_values_data_types = list_values
+            .iter()
+            .map(|expr| expr.data_type())
+            .collect::<Vec<DataType>>();
+
+        if list_values_data_types
+            .iter()
+            .any(|dt| *dt != value_data_type)
+        {
+            return Err(DataFusionError::Internal(format!(

Review comment:
       I think we should do this earlier already when creating/checking the 
logical plan.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to