Kimahriman commented on code in PR #1073: URL: https://github.com/apache/datafusion-comet/pull/1073#discussion_r1862550710
########## native/spark-expr/src/list.rs: ########## @@ -413,14 +426,297 @@ impl PartialEq<dyn Any> for GetArrayStructFields { } } +#[derive(Debug, Hash)] +pub struct ArrayInsert { + src_array_expr: Arc<dyn PhysicalExpr>, + pos_expr: Arc<dyn PhysicalExpr>, + item_expr: Arc<dyn PhysicalExpr>, + legacy_negative_index: bool, +} + +impl ArrayInsert { + pub fn new( + src_array_expr: Arc<dyn PhysicalExpr>, + pos_expr: Arc<dyn PhysicalExpr>, + item_expr: Arc<dyn PhysicalExpr>, + legacy_negative_index: bool, + ) -> Self { + Self { + src_array_expr, + pos_expr, + item_expr, + legacy_negative_index, + } + } +} + +impl PhysicalExpr for ArrayInsert { + fn as_any(&self) -> &dyn Any { + self + } + + fn data_type(&self, input_schema: &Schema) -> DataFusionResult<DataType> { + match self.src_array_expr.data_type(input_schema)? { + DataType::List(field) => Ok(DataType::List(field)), + DataType::LargeList(field) => Ok(DataType::LargeList(field)), + data_type => Err(DataFusionError::Internal(format!( + "Unexpected data type in ArrayInsert: {:?}", + data_type + ))), + } + } + + fn nullable(&self, input_schema: &Schema) -> DataFusionResult<bool> { + self.src_array_expr.nullable(input_schema) + } + + fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult<ColumnarValue> { + let pos_value = self + .pos_expr + .evaluate(batch)? + .into_array(batch.num_rows())?; + + // Spark supports only IntegerType (Int32): + // https://github.com/apache/spark/blob/branch-3.5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L4737 + if !matches!(pos_value.data_type(), DataType::Int32) { + return Err(DataFusionError::Internal(format!( + "Unexpected index data type in ArrayInsert: {:?}, expected type is Int32", + pos_value.data_type() + ))); + } + + // Check that src array is actually an array and get it's value type + let src_value = self + .src_array_expr + .evaluate(batch)? + .into_array(batch.num_rows())?; + let src_element_type = match src_value.data_type() { + DataType::List(field) => field.data_type(), + DataType::LargeList(field) => field.data_type(), + data_type => { + return Err(DataFusionError::Internal(format!( + "Unexpected src array type in ArrayInsert: {:?}", + data_type + ))) + } Review Comment: The difference I think is that a `LargeList` can store more than `Integer.MAX_VALUE` entries in _all rows in a single batch_, so if you have multiple Spark rows all with the max num of rows supported, it wouldn't fit into an Arrow `List` array. That would probably need to be supported elsewhere, but it may be worth keeping the `LargeList` handling around in case that scenario is supported. And other DataFusion expressions might return a `LargeList` even if it doesn't come directly from Spark? Does the native Parquet reader ever use a `LargeList`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org