Denys-Bushulyak commented on issue #9405:
URL:
https://github.com/apache/arrow-datafusion/issues/9405#issuecomment-1973590407
`tests/select_and_test.rs`
```rust
use std::{any::Any, fmt, sync::Arc};
use axum::async_trait;
use datafusion::{
arrow::{
array::{ArrayRef, Int32Array, RecordBatch, StringArray},
datatypes::{DataType, Field, Schema, SchemaRef},
util::pretty::pretty_format_batches,
},
catalog::{schema::SchemaProvider, CatalogProvider},
datasource::{TableProvider, TableType},
execution::context::{SessionContext, SessionState},
logical_expr::{Expr, TableProviderFilterPushDown},
physical_plan::{
stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType,
ExecutionPlan,
},
};
#[tokio::test]
async fn test_selection_with_and() {
let ctx = SessionContext::new();
ctx.register_catalog("test", Arc::new(TestCatalogProvider()));
let results = ctx
.sql("SELECT * FROM test.default.test_table WHERE id > 1 AND id < 3")
.await
.unwrap()
.collect()
.await
.unwrap();
assert_eq!(
&pretty_format_batches(&results).unwrap().to_string(),
"+----+-------+\n| id | title |\n+----+-------+\n| 2 | b
|\n+----+-------+\n"
);
}
struct TestCatalogProvider();
impl CatalogProvider for TestCatalogProvider {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema_names(&self) -> Vec<String> {
vec!["default".to_string()]
}
fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
Some(Arc::new(TestSchemaProvider))
}
}
struct TestSchemaProvider;
#[async_trait]
impl SchemaProvider for TestSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
vec!["test_table".to_string()]
}
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
if name == "test_table" {
Some(Arc::new(TestTableProvider {
schema: Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("title", DataType::Utf8, false),
])),
}))
} else {
None
}
}
fn table_exist(&self, name: &str) -> bool {
name == "test_table"
}
}
struct TestTableProvider {
schema: SchemaRef,
}
#[async_trait]
impl TableProvider for TestTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn table_type(&self) -> TableType {
TableType::View
}
async fn scan(
&self,
_state: &SessionState,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
// schema for execution plan depends on the projection,
// that's why we need to create a new schema
let schema = {
let fields = match projection {
Some(projection) => projection
.iter()
.map(|index| self.schema.field(*index).clone())
.collect(),
None => self.schema.fields().clone(),
};
Arc::new(Schema::new(fields))
};
Ok(Arc::new(TestExecutionPlan { schema }))
}
fn supports_filters_pushdown(
&self,
_filters: &[&datafusion::prelude::Expr],
) ->
datafusion::error::Result<Vec<datafusion::logical_expr::TableProviderFilterPushDown>>
{
Ok(vec![TableProviderFilterPushDown::Unsupported])
}
}
#[derive(Debug)]
struct TestExecutionPlan {
schema: SchemaRef,
}
impl DisplayAs for TestExecutionPlan {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) ->
fmt::Result {
write!(f, "TestExecutionPlan")
}
}
impl ExecutionPlan for TestExecutionPlan {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning
{
datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
}
fn output_ordering(&self) ->
Option<&[datafusion::physical_expr::PhysicalSortExpr]> {
None
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
_partition: usize,
_context: Arc<datafusion::execution::context::TaskContext>,
) ->
datafusion::error::Result<datafusion::physical_plan::SendableRecordBatchStream>
{
let batch = RecordBatch::try_new(
self.schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec![
"a".to_string(),
"b".to_string(),
"c".to_string(),
])) as ArrayRef,
],
);
let fut = futures::future::ready(batch.map_err(|e| e.into()));
let stream = futures::stream::once(fut);
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
stream,
)))
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]