This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new ab145c8 Move timestamp related tests out of context.rs and into sql
integration test (#1696)
ab145c8 is described below
commit ab145c801e17cd9d7f9be820f92cfa61ed086df1
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Jan 28 13:22:04 2022 -0500
Move timestamp related tests out of context.rs and into sql integration
test (#1696)
* Move some tests out of context.rs and into sql
* Move support test out of context.rs and into sql tests
* Fixup tests and make them compile
---
datafusion/src/execution/context.rs | 165 ------------------------------------
datafusion/src/test/mod.rs | 92 +-------------------
datafusion/tests/sql/aggregates.rs | 102 ++++++++++++++++++++++
datafusion/tests/sql/joins.rs | 48 +++++++++++
datafusion/tests/sql/mod.rs | 98 ++++++++++++++++++++-
5 files changed, 247 insertions(+), 258 deletions(-)
diff --git a/datafusion/src/execution/context.rs
b/datafusion/src/execution/context.rs
index 9cc54df..6ed8223 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -2266,121 +2266,6 @@ mod tests {
}
#[tokio::test]
- async fn aggregate_timestamps_sum() -> Result<()> {
- let tmp_dir = TempDir::new()?;
- let mut ctx = create_ctx(&tmp_dir, 1).await?;
- ctx.register_table("t", test::table_with_timestamps())
- .unwrap();
-
- let results = plan_and_collect(
- &mut ctx,
- "SELECT sum(nanos), sum(micros), sum(millis), sum(secs) FROM t",
- )
- .await
- .unwrap_err();
-
- assert_eq!(results.to_string(), "Error during planning: The function
Sum does not support inputs of type Timestamp(Nanosecond, None).");
-
- Ok(())
- }
-
- #[tokio::test]
- async fn aggregate_timestamps_count() -> Result<()> {
- let tmp_dir = TempDir::new()?;
- let mut ctx = create_ctx(&tmp_dir, 1).await?;
- ctx.register_table("t", test::table_with_timestamps())
- .unwrap();
-
- let results = plan_and_collect(
- &mut ctx,
- "SELECT count(nanos), count(micros), count(millis), count(secs)
FROM t",
- )
- .await
- .unwrap();
-
- let expected = vec![
-
"+----------------+-----------------+-----------------+---------------+",
- "| COUNT(t.nanos) | COUNT(t.micros) | COUNT(t.millis) |
COUNT(t.secs) |",
-
"+----------------+-----------------+-----------------+---------------+",
- "| 3 | 3 | 3 | 3
|",
-
"+----------------+-----------------+-----------------+---------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn aggregate_timestamps_min() -> Result<()> {
- let tmp_dir = TempDir::new()?;
- let mut ctx = create_ctx(&tmp_dir, 1).await?;
- ctx.register_table("t", test::table_with_timestamps())
- .unwrap();
-
- let results = plan_and_collect(
- &mut ctx,
- "SELECT min(nanos), min(micros), min(millis), min(secs) FROM t",
- )
- .await
- .unwrap();
-
- let expected = vec![
-
"+----------------------------+----------------------------+-------------------------+---------------------+",
- "| MIN(t.nanos) | MIN(t.micros) |
MIN(t.millis) | MIN(t.secs) |",
-
"+----------------------------+----------------------------+-------------------------+---------------------+",
- "| 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123450 |
2011-12-13 11:13:10.123 | 2011-12-13 11:13:10 |",
-
"+----------------------------+----------------------------+-------------------------+---------------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn aggregate_timestamps_max() -> Result<()> {
- let tmp_dir = TempDir::new()?;
- let mut ctx = create_ctx(&tmp_dir, 1).await?;
- ctx.register_table("t", test::table_with_timestamps())
- .unwrap();
-
- let results = plan_and_collect(
- &mut ctx,
- "SELECT max(nanos), max(micros), max(millis), max(secs) FROM t",
- )
- .await
- .unwrap();
-
- let expected = vec![
-
"+-------------------------+-------------------------+-------------------------+---------------------+",
- "| MAX(t.nanos) | MAX(t.micros) |
MAX(t.millis) | MAX(t.secs) |",
-
"+-------------------------+-------------------------+-------------------------+---------------------+",
- "| 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01
05:11:10.432 | 2021-01-01 05:11:10 |",
-
"+-------------------------+-------------------------+-------------------------+---------------------+",
- ];
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn aggregate_timestamps_avg() -> Result<()> {
- let tmp_dir = TempDir::new()?;
- let mut ctx = create_ctx(&tmp_dir, 1).await?;
- ctx.register_table("t", test::table_with_timestamps())
- .unwrap();
-
- let results = plan_and_collect(
- &mut ctx,
- "SELECT avg(nanos), avg(micros), avg(millis), avg(secs) FROM t",
- )
- .await
- .unwrap_err();
-
- assert_eq!(results.to_string(), "Error during planning: The function
Avg does not support inputs of type Timestamp(Nanosecond, None).");
- Ok(())
- }
-
- #[tokio::test]
async fn aggregate_avg_add() -> Result<()> {
let results = execute(
"SELECT AVG(c1), AVG(c1) + 1, AVG(c1) + 2, 1 + AVG(c1) FROM test",
@@ -2419,56 +2304,6 @@ mod tests {
}
#[tokio::test]
- async fn join_timestamp() -> Result<()> {
- let tmp_dir = TempDir::new()?;
- let mut ctx = create_ctx(&tmp_dir, 1).await?;
- ctx.register_table("t", test::table_with_timestamps())
- .unwrap();
-
- let expected = vec![
-
"+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
- "| nanos | micros |
millis | secs | name | nanos
| micros | millis | secs
| name |",
-
"+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
- "| 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123450 |
2011-12-13 11:13:10.123 | 2011-12-13 11:13:10 | Row 1 | 2011-12-13
11:13:10.123450 | 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123 |
2011-12-13 11:13:10 | Row 1 |",
- "| 2018-11-13 17:11:10.011375885 | 2018-11-13 17:11:10.011375 |
2018-11-13 17:11:10.011 | 2018-11-13 17:11:10 | Row 0 | 2018-11-13
17:11:10.011375885 | 2018-11-13 17:11:10.011375 | 2018-11-13 17:11:10.011 |
2018-11-13 17:11:10 | Row 0 |",
- "| 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 |
2021-01-01 05:11:10.432 | 2021-01-01 05:11:10 | Row 3 | 2021-01-01 05:11:10.432
| 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01
05:11:10 | Row 3 |",
-
"+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
- ];
-
- let results = plan_and_collect(
- &mut ctx,
- "SELECT * FROM t as t1 \
- JOIN (SELECT * FROM t) as t2 \
- ON t1.nanos = t2.nanos",
- )
- .await
- .unwrap();
- assert_batches_sorted_eq!(expected, &results);
-
- let results = plan_and_collect(
- &mut ctx,
- "SELECT * FROM t as t1 \
- JOIN (SELECT * FROM t) as t2 \
- ON t1.micros = t2.micros",
- )
- .await
- .unwrap();
- assert_batches_sorted_eq!(expected, &results);
-
- let results = plan_and_collect(
- &mut ctx,
- "SELECT * FROM t as t1 \
- JOIN (SELECT * FROM t) as t2 \
- ON t1.millis = t2.millis",
- )
- .await
- .unwrap();
- assert_batches_sorted_eq!(expected, &results);
-
- Ok(())
- }
-
- #[tokio::test]
async fn count_basic() -> Result<()> {
let results = execute("SELECT COUNT(c1), COUNT(c2) FROM test",
1).await?;
assert_eq!(results.len(), 1);
diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs
index 844d031..497bfe5 100644
--- a/datafusion/src/test/mod.rs
+++ b/datafusion/src/test/mod.rs
@@ -22,10 +22,7 @@ use crate::datasource::{MemTable, PartitionedFile,
TableProvider};
use crate::error::Result;
use crate::from_slice::FromSlice;
use crate::logical_plan::{LogicalPlan, LogicalPlanBuilder};
-use array::{
- Array, ArrayRef, StringArray, TimestampMicrosecondArray,
TimestampMillisecondArray,
- TimestampNanosecondArray, TimestampSecondArray,
-};
+use array::{Array, ArrayRef};
use arrow::array::{self, DecimalBuilder, Int32Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
@@ -185,14 +182,6 @@ pub fn make_partition(sz: i32) -> RecordBatch {
RecordBatch::try_new(schema, vec![arr]).unwrap()
}
-/// Return a new table provider containing all of the supported timestamp types
-pub fn table_with_timestamps() -> Arc<dyn TableProvider> {
- let batch = make_timestamps();
- let schema = batch.schema();
- let partitions = vec![vec![batch]];
- Arc::new(MemTable::try_new(schema, partitions).unwrap())
-}
-
/// Return a new table which provide this decimal column
pub fn table_with_decimal() -> Arc<dyn TableProvider> {
let batch_decimal = make_decimal();
@@ -214,85 +203,6 @@ fn make_decimal() -> RecordBatch {
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
}
-/// Return record batch with all of the supported timestamp types
-/// values
-///
-/// Columns are named:
-/// "nanos" --> TimestampNanosecondArray
-/// "micros" --> TimestampMicrosecondArray
-/// "millis" --> TimestampMillisecondArray
-/// "secs" --> TimestampSecondArray
-/// "names" --> StringArray
-pub fn make_timestamps() -> RecordBatch {
- let ts_strings = vec![
- Some("2018-11-13T17:11:10.011375885995"),
- Some("2011-12-13T11:13:10.12345"),
- None,
- Some("2021-1-1T05:11:10.432"),
- ];
-
- let ts_nanos = ts_strings
- .into_iter()
- .map(|t| {
- t.map(|t| {
- t.parse::<chrono::NaiveDateTime>()
- .unwrap()
- .timestamp_nanos()
- })
- })
- .collect::<Vec<_>>();
-
- let ts_micros = ts_nanos
- .iter()
- .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000))
- .collect::<Vec<_>>();
-
- let ts_millis = ts_nanos
- .iter()
- .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000))
- .collect::<Vec<_>>();
-
- let ts_secs = ts_nanos
- .iter()
- .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000))
- .collect::<Vec<_>>();
-
- let names = ts_nanos
- .iter()
- .enumerate()
- .map(|(i, _)| format!("Row {}", i))
- .collect::<Vec<_>>();
-
- let arr_nanos = TimestampNanosecondArray::from_opt_vec(ts_nanos, None);
- let arr_micros = TimestampMicrosecondArray::from_opt_vec(ts_micros, None);
- let arr_millis = TimestampMillisecondArray::from_opt_vec(ts_millis, None);
- let arr_secs = TimestampSecondArray::from_opt_vec(ts_secs, None);
-
- let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
- let arr_names = StringArray::from(names);
-
- let schema = Schema::new(vec![
- Field::new("nanos", arr_nanos.data_type().clone(), true),
- Field::new("micros", arr_micros.data_type().clone(), true),
- Field::new("millis", arr_millis.data_type().clone(), true),
- Field::new("secs", arr_secs.data_type().clone(), true),
- Field::new("name", arr_names.data_type().clone(), true),
- ]);
- let schema = Arc::new(schema);
-
- RecordBatch::try_new(
- schema,
- vec![
- Arc::new(arr_nanos),
- Arc::new(arr_micros),
- Arc::new(arr_millis),
- Arc::new(arr_secs),
- Arc::new(arr_names),
- ],
- )
- .unwrap()
-}
-
/// Asserts that given future is pending.
pub fn assert_is_pending<'a, T>(fut: &mut Pin<Box<dyn Future<Output = T> +
Send + 'a>>) {
let waker = futures::task::noop_waker();
diff --git a/datafusion/tests/sql/aggregates.rs
b/datafusion/tests/sql/aggregates.rs
index 9d72752..2d42870 100644
--- a/datafusion/tests/sql/aggregates.rs
+++ b/datafusion/tests/sql/aggregates.rs
@@ -473,3 +473,105 @@ async fn csv_query_array_agg_distinct() -> Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn aggregate_timestamps_sum() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ ctx.register_table("t", table_with_timestamps()).unwrap();
+
+ let results = plan_and_collect(
+ &mut ctx,
+ "SELECT sum(nanos), sum(micros), sum(millis), sum(secs) FROM t",
+ )
+ .await
+ .unwrap_err();
+
+ assert_eq!(results.to_string(), "Error during planning: The function Sum
does not support inputs of type Timestamp(Nanosecond, None).");
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_timestamps_count() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ ctx.register_table("t", table_with_timestamps()).unwrap();
+
+ let results = execute_to_batches(
+ &mut ctx,
+ "SELECT count(nanos), count(micros), count(millis), count(secs) FROM
t",
+ )
+ .await;
+
+ let expected = vec![
+
"+----------------+-----------------+-----------------+---------------+",
+ "| COUNT(t.nanos) | COUNT(t.micros) | COUNT(t.millis) | COUNT(t.secs)
|",
+
"+----------------+-----------------+-----------------+---------------+",
+ "| 3 | 3 | 3 | 3
|",
+
"+----------------+-----------------+-----------------+---------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_timestamps_min() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ ctx.register_table("t", table_with_timestamps()).unwrap();
+
+ let results = execute_to_batches(
+ &mut ctx,
+ "SELECT min(nanos), min(micros), min(millis), min(secs) FROM t",
+ )
+ .await;
+
+ let expected = vec![
+
"+----------------------------+----------------------------+-------------------------+---------------------+",
+ "| MIN(t.nanos) | MIN(t.micros) |
MIN(t.millis) | MIN(t.secs) |",
+
"+----------------------------+----------------------------+-------------------------+---------------------+",
+ "| 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123450 |
2011-12-13 11:13:10.123 | 2011-12-13 11:13:10 |",
+
"+----------------------------+----------------------------+-------------------------+---------------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_timestamps_max() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ ctx.register_table("t", table_with_timestamps()).unwrap();
+
+ let results = execute_to_batches(
+ &mut ctx,
+ "SELECT max(nanos), max(micros), max(millis), max(secs) FROM t",
+ )
+ .await;
+
+ let expected = vec![
+
"+-------------------------+-------------------------+-------------------------+---------------------+",
+ "| MAX(t.nanos) | MAX(t.micros) | MAX(t.millis)
| MAX(t.secs) |",
+
"+-------------------------+-------------------------+-------------------------+---------------------+",
+ "| 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01
05:11:10.432 | 2021-01-01 05:11:10 |",
+
"+-------------------------+-------------------------+-------------------------+---------------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_timestamps_avg() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ ctx.register_table("t", table_with_timestamps()).unwrap();
+
+ let results = plan_and_collect(
+ &mut ctx,
+ "SELECT avg(nanos), avg(micros), avg(millis), avg(secs) FROM t",
+ )
+ .await
+ .unwrap_err();
+
+ assert_eq!(results.to_string(), "Error during planning: The function Avg
does not support inputs of type Timestamp(Nanosecond, None).");
+ Ok(())
+}
diff --git a/datafusion/tests/sql/joins.rs b/datafusion/tests/sql/joins.rs
index 70d824b..04436ed 100644
--- a/datafusion/tests/sql/joins.rs
+++ b/datafusion/tests/sql/joins.rs
@@ -882,3 +882,51 @@ async fn
join_tables_with_duplicated_column_name_not_in_on_constraint() -> Resul
assert_batches_eq!(expected, &actual);
Ok(())
}
+
+#[tokio::test]
+async fn join_timestamp() -> Result<()> {
+ let mut ctx = ExecutionContext::new();
+ ctx.register_table("t", table_with_timestamps()).unwrap();
+
+ let expected = vec![
+
"+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
+ "| nanos | micros | millis
| secs | name | nanos
| micros | millis | secs |
name |",
+
"+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
+ "| 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123450 |
2011-12-13 11:13:10.123 | 2011-12-13 11:13:10 | Row 1 | 2011-12-13
11:13:10.123450 | 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123 |
2011-12-13 11:13:10 | Row 1 |",
+ "| 2018-11-13 17:11:10.011375885 | 2018-11-13 17:11:10.011375 |
2018-11-13 17:11:10.011 | 2018-11-13 17:11:10 | Row 0 | 2018-11-13
17:11:10.011375885 | 2018-11-13 17:11:10.011375 | 2018-11-13 17:11:10.011 |
2018-11-13 17:11:10 | Row 0 |",
+ "| 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 |
2021-01-01 05:11:10.432 | 2021-01-01 05:11:10 | Row 3 | 2021-01-01 05:11:10.432
| 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01
05:11:10 | Row 3 |",
+
"+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
+ ];
+
+ let results = execute_to_batches(
+ &mut ctx,
+ "SELECT * FROM t as t1 \
+ JOIN (SELECT * FROM t) as t2 \
+ ON t1.nanos = t2.nanos",
+ )
+ .await;
+
+ assert_batches_sorted_eq!(expected, &results);
+
+ let results = execute_to_batches(
+ &mut ctx,
+ "SELECT * FROM t as t1 \
+ JOIN (SELECT * FROM t) as t2 \
+ ON t1.micros = t2.micros",
+ )
+ .await;
+
+ assert_batches_sorted_eq!(expected, &results);
+
+ let results = execute_to_batches(
+ &mut ctx,
+ "SELECT * FROM t as t1 \
+ JOIN (SELECT * FROM t) as t2 \
+ ON t1.millis = t2.millis",
+ )
+ .await;
+
+ assert_batches_sorted_eq!(expected, &results);
+
+ Ok(())
+}
diff --git a/datafusion/tests/sql/mod.rs b/datafusion/tests/sql/mod.rs
index f2496c3..90fe513 100644
--- a/datafusion/tests/sql/mod.rs
+++ b/datafusion/tests/sql/mod.rs
@@ -521,8 +521,15 @@ async fn register_aggregate_csv(ctx: &mut
ExecutionContext) -> Result<()> {
Ok(())
}
-/// Execute query and return result set as 2-d table of Vecs
-/// `result[row][column]`
+/// Execute SQL and return results as a RecordBatch
+async fn plan_and_collect(
+ ctx: &mut ExecutionContext,
+ sql: &str,
+) -> Result<Vec<RecordBatch>> {
+ ctx.sql(sql).await?.collect().await
+}
+
+/// Execute query and return results as a Vec of RecordBatches
async fn execute_to_batches(ctx: &mut ExecutionContext, sql: &str) ->
Vec<RecordBatch> {
let msg = format!("Creating logical plan for '{}'", sql);
let plan = ctx.create_logical_plan(sql).expect(&msg);
@@ -734,6 +741,93 @@ fn normalize_vec_for_explain(v: Vec<Vec<String>>) ->
Vec<Vec<String>> {
.collect::<Vec<_>>()
}
+/// Return a new table provider containing all of the supported timestamp types
+pub fn table_with_timestamps() -> Arc<dyn TableProvider> {
+ let batch = make_timestamps();
+ let schema = batch.schema();
+ let partitions = vec![vec![batch]];
+ Arc::new(MemTable::try_new(schema, partitions).unwrap())
+}
+
+/// Return record batch with all of the supported timestamp types
+/// values
+///
+/// Columns are named:
+/// "nanos" --> TimestampNanosecondArray
+/// "micros" --> TimestampMicrosecondArray
+/// "millis" --> TimestampMillisecondArray
+/// "secs" --> TimestampSecondArray
+/// "names" --> StringArray
+pub fn make_timestamps() -> RecordBatch {
+ let ts_strings = vec![
+ Some("2018-11-13T17:11:10.011375885995"),
+ Some("2011-12-13T11:13:10.12345"),
+ None,
+ Some("2021-1-1T05:11:10.432"),
+ ];
+
+ let ts_nanos = ts_strings
+ .into_iter()
+ .map(|t| {
+ t.map(|t| {
+ t.parse::<chrono::NaiveDateTime>()
+ .unwrap()
+ .timestamp_nanos()
+ })
+ })
+ .collect::<Vec<_>>();
+
+ let ts_micros = ts_nanos
+ .iter()
+ .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000))
+ .collect::<Vec<_>>();
+
+ let ts_millis = ts_nanos
+ .iter()
+ .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000))
+ .collect::<Vec<_>>();
+
+ let ts_secs = ts_nanos
+ .iter()
+ .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000))
+ .collect::<Vec<_>>();
+
+ let names = ts_nanos
+ .iter()
+ .enumerate()
+ .map(|(i, _)| format!("Row {}", i))
+ .collect::<Vec<_>>();
+
+ let arr_nanos = TimestampNanosecondArray::from_opt_vec(ts_nanos, None);
+ let arr_micros = TimestampMicrosecondArray::from_opt_vec(ts_micros, None);
+ let arr_millis = TimestampMillisecondArray::from_opt_vec(ts_millis, None);
+ let arr_secs = TimestampSecondArray::from_opt_vec(ts_secs, None);
+
+ let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
+ let arr_names = StringArray::from(names);
+
+ let schema = Schema::new(vec![
+ Field::new("nanos", arr_nanos.data_type().clone(), true),
+ Field::new("micros", arr_micros.data_type().clone(), true),
+ Field::new("millis", arr_millis.data_type().clone(), true),
+ Field::new("secs", arr_secs.data_type().clone(), true),
+ Field::new("name", arr_names.data_type().clone(), true),
+ ]);
+ let schema = Arc::new(schema);
+
+ RecordBatch::try_new(
+ schema,
+ vec![
+ Arc::new(arr_nanos),
+ Arc::new(arr_micros),
+ Arc::new(arr_millis),
+ Arc::new(arr_secs),
+ Arc::new(arr_names),
+ ],
+ )
+ .unwrap()
+}
+
#[tokio::test]
async fn nyc() -> Result<()> {
// schema for nyxtaxi csv files