This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new e52f844  Add partitioned_csv setup code to sql_integration test (#1743)
e52f844 is described below

commit e52f844427cc80fc6551851117266f5e2372ec6c
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat Feb 5 06:10:35 2022 -0500

    Add partitioned_csv setup code to sql_integration test (#1743)
---
 datafusion/src/execution/context.rs     | 239 +-------------------------------
 datafusion/tests/sql/mod.rs             |   1 +
 datafusion/tests/sql/partitioned_csv.rs |  95 +++++++++++++
 datafusion/tests/sql/projection.rs      | 192 +++++++++++++++++++++++++
 datafusion/tests/sql/select.rs          |  59 +++++++-
 5 files changed, 347 insertions(+), 239 deletions(-)

diff --git a/datafusion/src/execution/context.rs 
b/datafusion/src/execution/context.rs
index fb271a1..96e49c8 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -1281,11 +1281,9 @@ mod tests {
     use super::*;
     use crate::execution::context::QueryPlanner;
     use crate::from_slice::FromSlice;
-    use crate::logical_plan::plan::Projection;
-    use crate::logical_plan::TableScan;
     use crate::logical_plan::{binary_expr, lit, Operator};
+    use crate::physical_plan::collect;
     use crate::physical_plan::functions::{make_scalar_function, Volatility};
-    use crate::physical_plan::{collect, collect_partitioned};
     use crate::test;
     use crate::variable::VarType;
     use crate::{
@@ -1311,7 +1309,6 @@ mod tests {
     use std::thread::{self, JoinHandle};
     use std::{io::prelude::*, sync::Mutex};
     use tempfile::TempDir;
-    use test::*;
 
     #[tokio::test]
     async fn shared_memory_and_disk_manager() {
@@ -1348,62 +1345,6 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn parallel_projection() -> Result<()> {
-        let partition_count = 4;
-        let results = execute("SELECT c1, c2 FROM test", 
partition_count).await?;
-
-        let expected = vec![
-            "+----+----+",
-            "| c1 | c2 |",
-            "+----+----+",
-            "| 3  | 1  |",
-            "| 3  | 2  |",
-            "| 3  | 3  |",
-            "| 3  | 4  |",
-            "| 3  | 5  |",
-            "| 3  | 6  |",
-            "| 3  | 7  |",
-            "| 3  | 8  |",
-            "| 3  | 9  |",
-            "| 3  | 10 |",
-            "| 2  | 1  |",
-            "| 2  | 2  |",
-            "| 2  | 3  |",
-            "| 2  | 4  |",
-            "| 2  | 5  |",
-            "| 2  | 6  |",
-            "| 2  | 7  |",
-            "| 2  | 8  |",
-            "| 2  | 9  |",
-            "| 2  | 10 |",
-            "| 1  | 1  |",
-            "| 1  | 2  |",
-            "| 1  | 3  |",
-            "| 1  | 4  |",
-            "| 1  | 5  |",
-            "| 1  | 6  |",
-            "| 1  | 7  |",
-            "| 1  | 8  |",
-            "| 1  | 9  |",
-            "| 1  | 10 |",
-            "| 0  | 1  |",
-            "| 0  | 2  |",
-            "| 0  | 3  |",
-            "| 0  | 4  |",
-            "| 0  | 5  |",
-            "| 0  | 6  |",
-            "| 0  | 7  |",
-            "| 0  | 8  |",
-            "| 0  | 9  |",
-            "| 0  | 10 |",
-            "+----+----+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
     async fn create_variable_expr() -> Result<()> {
         let tmp_dir = TempDir::new()?;
         let partition_count = 4;
@@ -1448,184 +1389,6 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn parallel_query_with_filter() -> Result<()> {
-        let tmp_dir = TempDir::new()?;
-        let partition_count = 4;
-        let ctx = create_ctx(&tmp_dir, partition_count).await?;
-
-        let logical_plan =
-            ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND 
c1 < 3")?;
-        let logical_plan = ctx.optimize(&logical_plan)?;
-
-        let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
-
-        let runtime = ctx.state.lock().runtime_env.clone();
-        let results = collect_partitioned(physical_plan, runtime).await?;
-
-        // note that the order of partitions is not deterministic
-        let mut num_rows = 0;
-        for partition in &results {
-            for batch in partition {
-                num_rows += batch.num_rows();
-            }
-        }
-        assert_eq!(20, num_rows);
-
-        let results: Vec<RecordBatch> = 
results.into_iter().flatten().collect();
-        let expected = vec![
-            "+----+----+",
-            "| c1 | c2 |",
-            "+----+----+",
-            "| 1  | 1  |",
-            "| 1  | 10 |",
-            "| 1  | 2  |",
-            "| 1  | 3  |",
-            "| 1  | 4  |",
-            "| 1  | 5  |",
-            "| 1  | 6  |",
-            "| 1  | 7  |",
-            "| 1  | 8  |",
-            "| 1  | 9  |",
-            "| 2  | 1  |",
-            "| 2  | 10 |",
-            "| 2  | 2  |",
-            "| 2  | 3  |",
-            "| 2  | 4  |",
-            "| 2  | 5  |",
-            "| 2  | 6  |",
-            "| 2  | 7  |",
-            "| 2  | 8  |",
-            "| 2  | 9  |",
-            "+----+----+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn projection_on_table_scan() -> Result<()> {
-        let tmp_dir = TempDir::new()?;
-        let partition_count = 4;
-        let ctx = create_ctx(&tmp_dir, partition_count).await?;
-        let runtime = ctx.state.lock().runtime_env.clone();
-
-        let table = ctx.table("test")?;
-        let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan())
-            .project(vec![col("c2")])?
-            .build()?;
-
-        let optimized_plan = ctx.optimize(&logical_plan)?;
-        match &optimized_plan {
-            LogicalPlan::Projection(Projection { input, .. }) => match 
&**input {
-                LogicalPlan::TableScan(TableScan {
-                    source,
-                    projected_schema,
-                    ..
-                }) => {
-                    assert_eq!(source.schema().fields().len(), 3);
-                    assert_eq!(projected_schema.fields().len(), 1);
-                }
-                _ => panic!("input to projection should be TableScan"),
-            },
-            _ => panic!("expect optimized_plan to be projection"),
-        }
-
-        let expected = "Projection: #test.c2\
-        \n  TableScan: test projection=Some([1])";
-        assert_eq!(format!("{:?}", optimized_plan), expected);
-
-        let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
-
-        assert_eq!(1, physical_plan.schema().fields().len());
-        assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
-
-        let batches = collect(physical_plan, runtime).await?;
-        assert_eq!(40, batches.iter().map(|x| x.num_rows()).sum::<usize>());
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn preserve_nullability_on_projection() -> Result<()> {
-        let tmp_dir = TempDir::new()?;
-        let ctx = create_ctx(&tmp_dir, 1).await?;
-
-        let schema: Schema = 
ctx.table("test").unwrap().schema().clone().into();
-        assert!(!schema.field_with_name("c1")?.is_nullable());
-
-        let plan = LogicalPlanBuilder::scan_empty(None, &schema, None)?
-            .project(vec![col("c1")])?
-            .build()?;
-
-        let plan = ctx.optimize(&plan)?;
-        let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?;
-        assert!(!physical_plan.schema().field_with_name("c1")?.is_nullable());
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn projection_on_memory_scan() -> Result<()> {
-        let schema = Schema::new(vec![
-            Field::new("a", DataType::Int32, false),
-            Field::new("b", DataType::Int32, false),
-            Field::new("c", DataType::Int32, false),
-        ]);
-        let schema = SchemaRef::new(schema);
-
-        let partitions = vec![vec![RecordBatch::try_new(
-            schema.clone(),
-            vec![
-                Arc::new(Int32Array::from_slice(&[1, 10, 10, 100])),
-                Arc::new(Int32Array::from_slice(&[2, 12, 12, 120])),
-                Arc::new(Int32Array::from_slice(&[3, 12, 12, 120])),
-            ],
-        )?]];
-
-        let plan = LogicalPlanBuilder::scan_memory(partitions, schema, None)?
-            .project(vec![col("b")])?
-            .build()?;
-        assert_fields_eq(&plan, vec!["b"]);
-
-        let ctx = ExecutionContext::new();
-        let optimized_plan = ctx.optimize(&plan)?;
-        match &optimized_plan {
-            LogicalPlan::Projection(Projection { input, .. }) => match 
&**input {
-                LogicalPlan::TableScan(TableScan {
-                    source,
-                    projected_schema,
-                    ..
-                }) => {
-                    assert_eq!(source.schema().fields().len(), 3);
-                    assert_eq!(projected_schema.fields().len(), 1);
-                }
-                _ => panic!("input to projection should be InMemoryScan"),
-            },
-            _ => panic!("expect optimized_plan to be projection"),
-        }
-
-        let expected = format!(
-            "Projection: #{}.b\
-        \n  TableScan: {} projection=Some([1])",
-            UNNAMED_TABLE, UNNAMED_TABLE
-        );
-        assert_eq!(format!("{:?}", optimized_plan), expected);
-
-        let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
-
-        assert_eq!(1, physical_plan.schema().fields().len());
-        assert_eq!("b", physical_plan.schema().field(0).name().as_str());
-
-        let runtime = ctx.state.lock().runtime_env.clone();
-        let batches = collect(physical_plan, runtime).await?;
-        assert_eq!(1, batches.len());
-        assert_eq!(1, batches[0].num_columns());
-        assert_eq!(4, batches[0].num_rows());
-
-        Ok(())
-    }
-
-    #[tokio::test]
     async fn sort() -> Result<()> {
         let results =
             execute("SELECT c1, c2 FROM test ORDER BY c1 DESC, c2 ASC", 
4).await?;
diff --git a/datafusion/tests/sql/mod.rs b/datafusion/tests/sql/mod.rs
index 95623d4..468762e 100644
--- a/datafusion/tests/sql/mod.rs
+++ b/datafusion/tests/sql/mod.rs
@@ -98,6 +98,7 @@ pub mod window;
 
 mod explain;
 pub mod information_schema;
+mod partitioned_csv;
 #[cfg_attr(not(feature = "unicode_expressions"), ignore)]
 pub mod unicode;
 
diff --git a/datafusion/tests/sql/partitioned_csv.rs 
b/datafusion/tests/sql/partitioned_csv.rs
new file mode 100644
index 0000000..5efc837
--- /dev/null
+++ b/datafusion/tests/sql/partitioned_csv.rs
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utility functions for running with a partitioned csv dataset:
+
+use std::{io::Write, sync::Arc};
+
+use arrow::{
+    datatypes::{DataType, Field, Schema, SchemaRef},
+    record_batch::RecordBatch,
+};
+use datafusion::{
+    error::Result,
+    prelude::{CsvReadOptions, ExecutionConfig, ExecutionContext},
+};
+use tempfile::TempDir;
+
+/// Execute SQL and return results
+async fn plan_and_collect(
+    ctx: &mut ExecutionContext,
+    sql: &str,
+) -> Result<Vec<RecordBatch>> {
+    ctx.sql(sql).await?.collect().await
+}
+
+/// Execute SQL and return results
+pub async fn execute(sql: &str, partition_count: usize) -> 
Result<Vec<RecordBatch>> {
+    let tmp_dir = TempDir::new()?;
+    let mut ctx = create_ctx(&tmp_dir, partition_count).await?;
+    plan_and_collect(&mut ctx, sql).await
+}
+
+/// Generate CSV partitions within the supplied directory
+fn populate_csv_partitions(
+    tmp_dir: &TempDir,
+    partition_count: usize,
+    file_extension: &str,
+) -> Result<SchemaRef> {
+    // define schema for data source (csv file)
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::UInt32, false),
+        Field::new("c2", DataType::UInt64, false),
+        Field::new("c3", DataType::Boolean, false),
+    ]));
+
+    // generate a partitioned file
+    for partition in 0..partition_count {
+        let filename = format!("partition-{}.{}", partition, file_extension);
+        let file_path = tmp_dir.path().join(&filename);
+        let mut file = std::fs::File::create(file_path)?;
+
+        // generate some data
+        for i in 0..=10 {
+            let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
+            file.write_all(data.as_bytes())?;
+        }
+    }
+
+    Ok(schema)
+}
+
+/// Generate a partitioned CSV file and register it with an execution context
+pub async fn create_ctx(
+    tmp_dir: &TempDir,
+    partition_count: usize,
+) -> Result<ExecutionContext> {
+    let mut ctx =
+        
ExecutionContext::with_config(ExecutionConfig::new().with_target_partitions(8));
+
+    let schema = populate_csv_partitions(tmp_dir, partition_count, ".csv")?;
+
+    // register csv file with the execution context
+    ctx.register_csv(
+        "test",
+        tmp_dir.path().to_str().unwrap(),
+        CsvReadOptions::new().schema(&schema),
+    )
+    .await?;
+
+    Ok(ctx)
+}
diff --git a/datafusion/tests/sql/projection.rs 
b/datafusion/tests/sql/projection.rs
index 57fa598..0a956a9 100644
--- a/datafusion/tests/sql/projection.rs
+++ b/datafusion/tests/sql/projection.rs
@@ -15,6 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use datafusion::logical_plan::{LogicalPlanBuilder, UNNAMED_TABLE};
+use tempfile::TempDir;
+
 use super::*;
 
 #[tokio::test]
@@ -73,3 +76,192 @@ async fn csv_query_group_by_avg_with_projection() -> 
Result<()> {
     assert_batches_sorted_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn parallel_projection() -> Result<()> {
+    let partition_count = 4;
+    let results =
+        partitioned_csv::execute("SELECT c1, c2 FROM test", 
partition_count).await?;
+
+    let expected = vec![
+        "+----+----+",
+        "| c1 | c2 |",
+        "+----+----+",
+        "| 3  | 1  |",
+        "| 3  | 2  |",
+        "| 3  | 3  |",
+        "| 3  | 4  |",
+        "| 3  | 5  |",
+        "| 3  | 6  |",
+        "| 3  | 7  |",
+        "| 3  | 8  |",
+        "| 3  | 9  |",
+        "| 3  | 10 |",
+        "| 2  | 1  |",
+        "| 2  | 2  |",
+        "| 2  | 3  |",
+        "| 2  | 4  |",
+        "| 2  | 5  |",
+        "| 2  | 6  |",
+        "| 2  | 7  |",
+        "| 2  | 8  |",
+        "| 2  | 9  |",
+        "| 2  | 10 |",
+        "| 1  | 1  |",
+        "| 1  | 2  |",
+        "| 1  | 3  |",
+        "| 1  | 4  |",
+        "| 1  | 5  |",
+        "| 1  | 6  |",
+        "| 1  | 7  |",
+        "| 1  | 8  |",
+        "| 1  | 9  |",
+        "| 1  | 10 |",
+        "| 0  | 1  |",
+        "| 0  | 2  |",
+        "| 0  | 3  |",
+        "| 0  | 4  |",
+        "| 0  | 5  |",
+        "| 0  | 6  |",
+        "| 0  | 7  |",
+        "| 0  | 8  |",
+        "| 0  | 9  |",
+        "| 0  | 10 |",
+        "+----+----+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn projection_on_table_scan() -> Result<()> {
+    let tmp_dir = TempDir::new()?;
+    let partition_count = 4;
+    let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?;
+    let runtime = ctx.state.lock().runtime_env.clone();
+
+    let table = ctx.table("test")?;
+    let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan())
+        .project(vec![col("c2")])?
+        .build()?;
+
+    let optimized_plan = ctx.optimize(&logical_plan)?;
+    match &optimized_plan {
+        LogicalPlan::Projection(Projection { input, .. }) => match &**input {
+            LogicalPlan::TableScan(TableScan {
+                source,
+                projected_schema,
+                ..
+            }) => {
+                assert_eq!(source.schema().fields().len(), 3);
+                assert_eq!(projected_schema.fields().len(), 1);
+            }
+            _ => panic!("input to projection should be TableScan"),
+        },
+        _ => panic!("expect optimized_plan to be projection"),
+    }
+
+    let expected = "Projection: #test.c2\
+                    \n  TableScan: test projection=Some([1])";
+    assert_eq!(format!("{:?}", optimized_plan), expected);
+
+    let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
+
+    assert_eq!(1, physical_plan.schema().fields().len());
+    assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
+
+    let batches = collect(physical_plan, runtime).await?;
+    assert_eq!(40, batches.iter().map(|x| x.num_rows()).sum::<usize>());
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn preserve_nullability_on_projection() -> Result<()> {
+    let tmp_dir = TempDir::new()?;
+    let ctx = partitioned_csv::create_ctx(&tmp_dir, 1).await?;
+
+    let schema: Schema = ctx.table("test").unwrap().schema().clone().into();
+    assert!(!schema.field_with_name("c1")?.is_nullable());
+
+    let plan = LogicalPlanBuilder::scan_empty(None, &schema, None)?
+        .project(vec![col("c1")])?
+        .build()?;
+
+    let plan = ctx.optimize(&plan)?;
+    let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?;
+    assert!(!physical_plan.schema().field_with_name("c1")?.is_nullable());
+    Ok(())
+}
+
+#[tokio::test]
+async fn projection_on_memory_scan() -> Result<()> {
+    let schema = Schema::new(vec![
+        Field::new("a", DataType::Int32, false),
+        Field::new("b", DataType::Int32, false),
+        Field::new("c", DataType::Int32, false),
+    ]);
+    let schema = SchemaRef::new(schema);
+
+    let partitions = vec![vec![RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(Int32Array::from_slice(&[1, 10, 10, 100])),
+            Arc::new(Int32Array::from_slice(&[2, 12, 12, 120])),
+            Arc::new(Int32Array::from_slice(&[3, 12, 12, 120])),
+        ],
+    )?]];
+
+    let plan = LogicalPlanBuilder::scan_memory(partitions, schema, None)?
+        .project(vec![col("b")])?
+        .build()?;
+    assert_fields_eq(&plan, vec!["b"]);
+
+    let ctx = ExecutionContext::new();
+    let optimized_plan = ctx.optimize(&plan)?;
+    match &optimized_plan {
+        LogicalPlan::Projection(Projection { input, .. }) => match &**input {
+            LogicalPlan::TableScan(TableScan {
+                source,
+                projected_schema,
+                ..
+            }) => {
+                assert_eq!(source.schema().fields().len(), 3);
+                assert_eq!(projected_schema.fields().len(), 1);
+            }
+            _ => panic!("input to projection should be InMemoryScan"),
+        },
+        _ => panic!("expect optimized_plan to be projection"),
+    }
+
+    let expected = format!(
+        "Projection: #{}.b\
+         \n  TableScan: {} projection=Some([1])",
+        UNNAMED_TABLE, UNNAMED_TABLE
+    );
+    assert_eq!(format!("{:?}", optimized_plan), expected);
+
+    let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
+
+    assert_eq!(1, physical_plan.schema().fields().len());
+    assert_eq!("b", physical_plan.schema().field(0).name().as_str());
+
+    let runtime = ctx.state.lock().runtime_env.clone();
+    let batches = collect(physical_plan, runtime).await?;
+    assert_eq!(1, batches.len());
+    assert_eq!(1, batches[0].num_columns());
+    assert_eq!(4, batches[0].num_rows());
+
+    Ok(())
+}
+
+fn assert_fields_eq(plan: &LogicalPlan, expected: Vec<&str>) {
+    let actual: Vec<String> = plan
+        .schema()
+        .fields()
+        .iter()
+        .map(|f| f.name().clone())
+        .collect();
+    assert_eq!(actual, expected);
+}
diff --git a/datafusion/tests/sql/select.rs b/datafusion/tests/sql/select.rs
index 759a45c..02869dd 100644
--- a/datafusion/tests/sql/select.rs
+++ b/datafusion/tests/sql/select.rs
@@ -16,7 +16,8 @@
 // under the License.
 
 use super::*;
-use datafusion::from_slice::FromSlice;
+use datafusion::{from_slice::FromSlice, physical_plan::collect_partitioned};
+use tempfile::TempDir;
 
 #[tokio::test]
 async fn all_where_empty() -> Result<()> {
@@ -928,3 +929,59 @@ async fn csv_select_nested() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn parallel_query_with_filter() -> Result<()> {
+    let tmp_dir = TempDir::new()?;
+    let partition_count = 4;
+    let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?;
+
+    let logical_plan =
+        ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 
3")?;
+    let logical_plan = ctx.optimize(&logical_plan)?;
+
+    let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
+
+    let runtime = ctx.state.lock().runtime_env.clone();
+    let results = collect_partitioned(physical_plan, runtime).await?;
+
+    // note that the order of partitions is not deterministic
+    let mut num_rows = 0;
+    for partition in &results {
+        for batch in partition {
+            num_rows += batch.num_rows();
+        }
+    }
+    assert_eq!(20, num_rows);
+
+    let results: Vec<RecordBatch> = results.into_iter().flatten().collect();
+    let expected = vec![
+        "+----+----+",
+        "| c1 | c2 |",
+        "+----+----+",
+        "| 1  | 1  |",
+        "| 1  | 10 |",
+        "| 1  | 2  |",
+        "| 1  | 3  |",
+        "| 1  | 4  |",
+        "| 1  | 5  |",
+        "| 1  | 6  |",
+        "| 1  | 7  |",
+        "| 1  | 8  |",
+        "| 1  | 9  |",
+        "| 2  | 1  |",
+        "| 2  | 10 |",
+        "| 2  | 2  |",
+        "| 2  | 3  |",
+        "| 2  | 4  |",
+        "| 2  | 5  |",
+        "| 2  | 6  |",
+        "| 2  | 7  |",
+        "| 2  | 8  |",
+        "| 2  | 9  |",
+        "+----+----+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}

Reply via email to