This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new bdc56ca3d Add bounds check to Column physical expression (#3238)
bdc56ca3d is described below
commit bdc56ca3d9b3149267682cf4a95692251269efec
Author: Andy Grove <[email protected]>
AuthorDate: Wed Aug 24 06:26:44 2022 -0600
Add bounds check to Column physical expression (#3238)
---
datafusion/physical-expr/src/expressions/column.rs | 65 +++++++++++++++++++++-
1 file changed, 64 insertions(+), 1 deletion(-)
diff --git a/datafusion/physical-expr/src/expressions/column.rs
b/datafusion/physical-expr/src/expressions/column.rs
index 3def89f78..63f8c4053 100644
--- a/datafusion/physical-expr/src/expressions/column.rs
+++ b/datafusion/physical-expr/src/expressions/column.rs
@@ -25,7 +25,7 @@ use arrow::{
};
use crate::PhysicalExpr;
-use datafusion_common::Result;
+use datafusion_common::{DataFusionError, Result};
use datafusion_expr::ColumnarValue;
/// Represents the column at a given index in a RecordBatch
@@ -74,21 +74,84 @@ impl PhysicalExpr for Column {
/// Get the data type of this expression, given the schema of the input
fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
+ self.bounds_check(input_schema)?;
Ok(input_schema.field(self.index).data_type().clone())
}
/// Decide whehter this expression is nullable, given the schema of the
input
fn nullable(&self, input_schema: &Schema) -> Result<bool> {
+ self.bounds_check(input_schema)?;
Ok(input_schema.field(self.index).is_nullable())
}
/// Evaluate the expression
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
+ self.bounds_check(batch.schema().as_ref())?;
Ok(ColumnarValue::Array(batch.column(self.index).clone()))
}
}
+impl Column {
+ fn bounds_check(&self, input_schema: &Schema) -> Result<()> {
+ if self.index < input_schema.fields.len() {
+ Ok(())
+ } else {
+ Err(DataFusionError::Internal(format!(
+ "PhysicalExpr Column references column '{}' at index {}
(zero-based) but input schema only has {} columns: {:?}",
+ self.name,
+ self.index, input_schema.fields.len(),
input_schema.fields().iter().map(|f|
f.name().clone()).collect::<Vec<String>>())))
+ }
+ }
+}
+
/// Create a column expression
pub fn col(name: &str, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(Column::new_with_schema(name, schema)?))
}
+
+#[cfg(test)]
+mod test {
+ use crate::expressions::Column;
+ use crate::PhysicalExpr;
+ use arrow::array::StringArray;
+ use arrow::datatypes::{DataType, Field, Schema};
+ use arrow::record_batch::RecordBatch;
+ use datafusion_common::Result;
+ use std::sync::Arc;
+
+ #[test]
+ fn out_of_bounds_data_type() {
+ let schema = Schema::new(vec![Field::new("foo", DataType::Utf8,
true)]);
+ let col = Column::new("id", 9);
+ let error = col.data_type(&schema).expect_err("error");
+ assert_eq!("Internal error: PhysicalExpr Column references column 'id'
at index 9 (zero-based) \
+ but input schema only has 1 columns: [\"foo\"]. This was likely
caused by a bug in \
+ DataFusion's code and we would welcome that you file an bug report
in our issue tracker",
+ &format!("{}", error))
+ }
+
+ #[test]
+ fn out_of_bounds_nullable() {
+ let schema = Schema::new(vec![Field::new("foo", DataType::Utf8,
true)]);
+ let col = Column::new("id", 9);
+ let error = col.nullable(&schema).expect_err("error");
+ assert_eq!("Internal error: PhysicalExpr Column references column 'id'
at index 9 (zero-based) \
+ but input schema only has 1 columns: [\"foo\"]. This was likely
caused by a bug in \
+ DataFusion's code and we would welcome that you file an bug report
in our issue tracker",
+ &format!("{}", error))
+ }
+
+ #[test]
+ fn out_of_bounds_evaluate() -> Result<()> {
+ let schema = Schema::new(vec![Field::new("foo", DataType::Utf8,
true)]);
+ let data: StringArray = vec!["data"].into();
+ let batch = RecordBatch::try_new(Arc::new(schema),
vec![Arc::new(data)])?;
+ let col = Column::new("id", 9);
+ let error = col.evaluate(&batch).expect_err("error");
+ assert_eq!("Internal error: PhysicalExpr Column references column 'id'
at index 9 (zero-based) \
+ but input schema only has 1 columns: [\"foo\"]. This was likely
caused by a bug in \
+ DataFusion's code and we would welcome that you file an bug report
in our issue tracker",
+ &format!("{}", error));
+ Ok(())
+ }
+}