This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new ab145c8  Move timestamp related tests out of context.rs and into sql 
integration test (#1696)
ab145c8 is described below

commit ab145c801e17cd9d7f9be820f92cfa61ed086df1
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Jan 28 13:22:04 2022 -0500

    Move timestamp related tests out of context.rs and into sql integration 
test (#1696)
    
    * Move some tests out of context.rs and into sql
    
    * Move support test out of context.rs and into sql tests
    
    * Fixup tests and make them compile
---
 datafusion/src/execution/context.rs | 165 ------------------------------------
 datafusion/src/test/mod.rs          |  92 +-------------------
 datafusion/tests/sql/aggregates.rs  | 102 ++++++++++++++++++++++
 datafusion/tests/sql/joins.rs       |  48 +++++++++++
 datafusion/tests/sql/mod.rs         |  98 ++++++++++++++++++++-
 5 files changed, 247 insertions(+), 258 deletions(-)

diff --git a/datafusion/src/execution/context.rs 
b/datafusion/src/execution/context.rs
index 9cc54df..6ed8223 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -2266,121 +2266,6 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn aggregate_timestamps_sum() -> Result<()> {
-        let tmp_dir = TempDir::new()?;
-        let mut ctx = create_ctx(&tmp_dir, 1).await?;
-        ctx.register_table("t", test::table_with_timestamps())
-            .unwrap();
-
-        let results = plan_and_collect(
-            &mut ctx,
-            "SELECT sum(nanos), sum(micros), sum(millis), sum(secs) FROM t",
-        )
-        .await
-        .unwrap_err();
-
-        assert_eq!(results.to_string(), "Error during planning: The function 
Sum does not support inputs of type Timestamp(Nanosecond, None).");
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn aggregate_timestamps_count() -> Result<()> {
-        let tmp_dir = TempDir::new()?;
-        let mut ctx = create_ctx(&tmp_dir, 1).await?;
-        ctx.register_table("t", test::table_with_timestamps())
-            .unwrap();
-
-        let results = plan_and_collect(
-            &mut ctx,
-            "SELECT count(nanos), count(micros), count(millis), count(secs) 
FROM t",
-        )
-        .await
-        .unwrap();
-
-        let expected = vec![
-            
"+----------------+-----------------+-----------------+---------------+",
-            "| COUNT(t.nanos) | COUNT(t.micros) | COUNT(t.millis) | 
COUNT(t.secs) |",
-            
"+----------------+-----------------+-----------------+---------------+",
-            "| 3              | 3               | 3               | 3          
   |",
-            
"+----------------+-----------------+-----------------+---------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn aggregate_timestamps_min() -> Result<()> {
-        let tmp_dir = TempDir::new()?;
-        let mut ctx = create_ctx(&tmp_dir, 1).await?;
-        ctx.register_table("t", test::table_with_timestamps())
-            .unwrap();
-
-        let results = plan_and_collect(
-            &mut ctx,
-            "SELECT min(nanos), min(micros), min(millis), min(secs) FROM t",
-        )
-        .await
-        .unwrap();
-
-        let expected = vec![
-            
"+----------------------------+----------------------------+-------------------------+---------------------+",
-            "| MIN(t.nanos)               | MIN(t.micros)              | 
MIN(t.millis)           | MIN(t.secs)         |",
-            
"+----------------------------+----------------------------+-------------------------+---------------------+",
-            "| 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123450 | 
2011-12-13 11:13:10.123 | 2011-12-13 11:13:10 |",
-            
"+----------------------------+----------------------------+-------------------------+---------------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn aggregate_timestamps_max() -> Result<()> {
-        let tmp_dir = TempDir::new()?;
-        let mut ctx = create_ctx(&tmp_dir, 1).await?;
-        ctx.register_table("t", test::table_with_timestamps())
-            .unwrap();
-
-        let results = plan_and_collect(
-            &mut ctx,
-            "SELECT max(nanos), max(micros), max(millis), max(secs) FROM t",
-        )
-        .await
-        .unwrap();
-
-        let expected = vec![
-            
"+-------------------------+-------------------------+-------------------------+---------------------+",
-            "| MAX(t.nanos)            | MAX(t.micros)           | 
MAX(t.millis)           | MAX(t.secs)         |",
-            
"+-------------------------+-------------------------+-------------------------+---------------------+",
-            "| 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 
05:11:10.432 | 2021-01-01 05:11:10 |",
-            
"+-------------------------+-------------------------+-------------------------+---------------------+",
-        ];
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn aggregate_timestamps_avg() -> Result<()> {
-        let tmp_dir = TempDir::new()?;
-        let mut ctx = create_ctx(&tmp_dir, 1).await?;
-        ctx.register_table("t", test::table_with_timestamps())
-            .unwrap();
-
-        let results = plan_and_collect(
-            &mut ctx,
-            "SELECT avg(nanos), avg(micros), avg(millis), avg(secs) FROM t",
-        )
-        .await
-        .unwrap_err();
-
-        assert_eq!(results.to_string(), "Error during planning: The function 
Avg does not support inputs of type Timestamp(Nanosecond, None).");
-        Ok(())
-    }
-
-    #[tokio::test]
     async fn aggregate_avg_add() -> Result<()> {
         let results = execute(
             "SELECT AVG(c1), AVG(c1) + 1, AVG(c1) + 2, 1 + AVG(c1) FROM test",
@@ -2419,56 +2304,6 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn join_timestamp() -> Result<()> {
-        let tmp_dir = TempDir::new()?;
-        let mut ctx = create_ctx(&tmp_dir, 1).await?;
-        ctx.register_table("t", test::table_with_timestamps())
-            .unwrap();
-
-        let expected = vec![
-            
"+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
-            "| nanos                         | micros                     | 
millis                  | secs                | name  | nanos                   
      | micros                     | millis                  | secs             
   | name  |",
-            
"+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
-            "| 2011-12-13 11:13:10.123450    | 2011-12-13 11:13:10.123450 | 
2011-12-13 11:13:10.123 | 2011-12-13 11:13:10 | Row 1 | 2011-12-13 
11:13:10.123450    | 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123 | 
2011-12-13 11:13:10 | Row 1 |",
-            "| 2018-11-13 17:11:10.011375885 | 2018-11-13 17:11:10.011375 | 
2018-11-13 17:11:10.011 | 2018-11-13 17:11:10 | Row 0 | 2018-11-13 
17:11:10.011375885 | 2018-11-13 17:11:10.011375 | 2018-11-13 17:11:10.011 | 
2018-11-13 17:11:10 | Row 0 |",
-            "| 2021-01-01 05:11:10.432       | 2021-01-01 05:11:10.432    | 
2021-01-01 05:11:10.432 | 2021-01-01 05:11:10 | Row 3 | 2021-01-01 05:11:10.432 
      | 2021-01-01 05:11:10.432    | 2021-01-01 05:11:10.432 | 2021-01-01 
05:11:10 | Row 3 |",
-            
"+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
-        ];
-
-        let results = plan_and_collect(
-            &mut ctx,
-            "SELECT * FROM t as t1  \
-             JOIN (SELECT * FROM t) as t2 \
-             ON t1.nanos = t2.nanos",
-        )
-        .await
-        .unwrap();
-        assert_batches_sorted_eq!(expected, &results);
-
-        let results = plan_and_collect(
-            &mut ctx,
-            "SELECT * FROM t as t1  \
-             JOIN (SELECT * FROM t) as t2 \
-             ON t1.micros = t2.micros",
-        )
-        .await
-        .unwrap();
-        assert_batches_sorted_eq!(expected, &results);
-
-        let results = plan_and_collect(
-            &mut ctx,
-            "SELECT * FROM t as t1  \
-             JOIN (SELECT * FROM t) as t2 \
-             ON t1.millis = t2.millis",
-        )
-        .await
-        .unwrap();
-        assert_batches_sorted_eq!(expected, &results);
-
-        Ok(())
-    }
-
-    #[tokio::test]
     async fn count_basic() -> Result<()> {
         let results = execute("SELECT COUNT(c1), COUNT(c2) FROM test", 
1).await?;
         assert_eq!(results.len(), 1);
diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs
index 844d031..497bfe5 100644
--- a/datafusion/src/test/mod.rs
+++ b/datafusion/src/test/mod.rs
@@ -22,10 +22,7 @@ use crate::datasource::{MemTable, PartitionedFile, 
TableProvider};
 use crate::error::Result;
 use crate::from_slice::FromSlice;
 use crate::logical_plan::{LogicalPlan, LogicalPlanBuilder};
-use array::{
-    Array, ArrayRef, StringArray, TimestampMicrosecondArray, 
TimestampMillisecondArray,
-    TimestampNanosecondArray, TimestampSecondArray,
-};
+use array::{Array, ArrayRef};
 use arrow::array::{self, DecimalBuilder, Int32Array};
 use arrow::datatypes::{DataType, Field, Schema};
 use arrow::record_batch::RecordBatch;
@@ -185,14 +182,6 @@ pub fn make_partition(sz: i32) -> RecordBatch {
     RecordBatch::try_new(schema, vec![arr]).unwrap()
 }
 
-/// Return a new table provider containing all of the supported timestamp types
-pub fn table_with_timestamps() -> Arc<dyn TableProvider> {
-    let batch = make_timestamps();
-    let schema = batch.schema();
-    let partitions = vec![vec![batch]];
-    Arc::new(MemTable::try_new(schema, partitions).unwrap())
-}
-
 /// Return a new table which provide this decimal column
 pub fn table_with_decimal() -> Arc<dyn TableProvider> {
     let batch_decimal = make_decimal();
@@ -214,85 +203,6 @@ fn make_decimal() -> RecordBatch {
     RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap()
 }
 
-/// Return  record batch with all of the supported timestamp types
-/// values
-///
-/// Columns are named:
-/// "nanos" --> TimestampNanosecondArray
-/// "micros" --> TimestampMicrosecondArray
-/// "millis" --> TimestampMillisecondArray
-/// "secs" --> TimestampSecondArray
-/// "names" --> StringArray
-pub fn make_timestamps() -> RecordBatch {
-    let ts_strings = vec![
-        Some("2018-11-13T17:11:10.011375885995"),
-        Some("2011-12-13T11:13:10.12345"),
-        None,
-        Some("2021-1-1T05:11:10.432"),
-    ];
-
-    let ts_nanos = ts_strings
-        .into_iter()
-        .map(|t| {
-            t.map(|t| {
-                t.parse::<chrono::NaiveDateTime>()
-                    .unwrap()
-                    .timestamp_nanos()
-            })
-        })
-        .collect::<Vec<_>>();
-
-    let ts_micros = ts_nanos
-        .iter()
-        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000))
-        .collect::<Vec<_>>();
-
-    let ts_millis = ts_nanos
-        .iter()
-        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000))
-        .collect::<Vec<_>>();
-
-    let ts_secs = ts_nanos
-        .iter()
-        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000))
-        .collect::<Vec<_>>();
-
-    let names = ts_nanos
-        .iter()
-        .enumerate()
-        .map(|(i, _)| format!("Row {}", i))
-        .collect::<Vec<_>>();
-
-    let arr_nanos = TimestampNanosecondArray::from_opt_vec(ts_nanos, None);
-    let arr_micros = TimestampMicrosecondArray::from_opt_vec(ts_micros, None);
-    let arr_millis = TimestampMillisecondArray::from_opt_vec(ts_millis, None);
-    let arr_secs = TimestampSecondArray::from_opt_vec(ts_secs, None);
-
-    let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
-    let arr_names = StringArray::from(names);
-
-    let schema = Schema::new(vec![
-        Field::new("nanos", arr_nanos.data_type().clone(), true),
-        Field::new("micros", arr_micros.data_type().clone(), true),
-        Field::new("millis", arr_millis.data_type().clone(), true),
-        Field::new("secs", arr_secs.data_type().clone(), true),
-        Field::new("name", arr_names.data_type().clone(), true),
-    ]);
-    let schema = Arc::new(schema);
-
-    RecordBatch::try_new(
-        schema,
-        vec![
-            Arc::new(arr_nanos),
-            Arc::new(arr_micros),
-            Arc::new(arr_millis),
-            Arc::new(arr_secs),
-            Arc::new(arr_names),
-        ],
-    )
-    .unwrap()
-}
-
 /// Asserts that given future is pending.
 pub fn assert_is_pending<'a, T>(fut: &mut Pin<Box<dyn Future<Output = T> + 
Send + 'a>>) {
     let waker = futures::task::noop_waker();
diff --git a/datafusion/tests/sql/aggregates.rs 
b/datafusion/tests/sql/aggregates.rs
index 9d72752..2d42870 100644
--- a/datafusion/tests/sql/aggregates.rs
+++ b/datafusion/tests/sql/aggregates.rs
@@ -473,3 +473,105 @@ async fn csv_query_array_agg_distinct() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn aggregate_timestamps_sum() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table("t", table_with_timestamps()).unwrap();
+
+    let results = plan_and_collect(
+        &mut ctx,
+        "SELECT sum(nanos), sum(micros), sum(millis), sum(secs) FROM t",
+    )
+    .await
+    .unwrap_err();
+
+    assert_eq!(results.to_string(), "Error during planning: The function Sum 
does not support inputs of type Timestamp(Nanosecond, None).");
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_timestamps_count() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table("t", table_with_timestamps()).unwrap();
+
+    let results = execute_to_batches(
+        &mut ctx,
+        "SELECT count(nanos), count(micros), count(millis), count(secs) FROM 
t",
+    )
+    .await;
+
+    let expected = vec![
+        
"+----------------+-----------------+-----------------+---------------+",
+        "| COUNT(t.nanos) | COUNT(t.micros) | COUNT(t.millis) | COUNT(t.secs) 
|",
+        
"+----------------+-----------------+-----------------+---------------+",
+        "| 3              | 3               | 3               | 3             
|",
+        
"+----------------+-----------------+-----------------+---------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_timestamps_min() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table("t", table_with_timestamps()).unwrap();
+
+    let results = execute_to_batches(
+        &mut ctx,
+        "SELECT min(nanos), min(micros), min(millis), min(secs) FROM t",
+    )
+    .await;
+
+    let expected = vec![
+        
"+----------------------------+----------------------------+-------------------------+---------------------+",
+        "| MIN(t.nanos)               | MIN(t.micros)              | 
MIN(t.millis)           | MIN(t.secs)         |",
+        
"+----------------------------+----------------------------+-------------------------+---------------------+",
+        "| 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123450 | 
2011-12-13 11:13:10.123 | 2011-12-13 11:13:10 |",
+        
"+----------------------------+----------------------------+-------------------------+---------------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_timestamps_max() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table("t", table_with_timestamps()).unwrap();
+
+    let results = execute_to_batches(
+        &mut ctx,
+        "SELECT max(nanos), max(micros), max(millis), max(secs) FROM t",
+    )
+    .await;
+
+    let expected = vec![
+        
"+-------------------------+-------------------------+-------------------------+---------------------+",
+        "| MAX(t.nanos)            | MAX(t.micros)           | MAX(t.millis)   
        | MAX(t.secs)         |",
+        
"+-------------------------+-------------------------+-------------------------+---------------------+",
+        "| 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 
05:11:10.432 | 2021-01-01 05:11:10 |",
+        
"+-------------------------+-------------------------+-------------------------+---------------------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn aggregate_timestamps_avg() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table("t", table_with_timestamps()).unwrap();
+
+    let results = plan_and_collect(
+        &mut ctx,
+        "SELECT avg(nanos), avg(micros), avg(millis), avg(secs) FROM t",
+    )
+    .await
+    .unwrap_err();
+
+    assert_eq!(results.to_string(), "Error during planning: The function Avg 
does not support inputs of type Timestamp(Nanosecond, None).");
+    Ok(())
+}
diff --git a/datafusion/tests/sql/joins.rs b/datafusion/tests/sql/joins.rs
index 70d824b..04436ed 100644
--- a/datafusion/tests/sql/joins.rs
+++ b/datafusion/tests/sql/joins.rs
@@ -882,3 +882,51 @@ async fn 
join_tables_with_duplicated_column_name_not_in_on_constraint() -> Resul
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn join_timestamp() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    ctx.register_table("t", table_with_timestamps()).unwrap();
+
+    let expected = vec![
+        
"+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
+        "| nanos                         | micros                     | millis 
                 | secs                | name  | nanos                         
| micros                     | millis                  | secs                | 
name  |",
+        
"+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
+        "| 2011-12-13 11:13:10.123450    | 2011-12-13 11:13:10.123450 | 
2011-12-13 11:13:10.123 | 2011-12-13 11:13:10 | Row 1 | 2011-12-13 
11:13:10.123450    | 2011-12-13 11:13:10.123450 | 2011-12-13 11:13:10.123 | 
2011-12-13 11:13:10 | Row 1 |",
+        "| 2018-11-13 17:11:10.011375885 | 2018-11-13 17:11:10.011375 | 
2018-11-13 17:11:10.011 | 2018-11-13 17:11:10 | Row 0 | 2018-11-13 
17:11:10.011375885 | 2018-11-13 17:11:10.011375 | 2018-11-13 17:11:10.011 | 
2018-11-13 17:11:10 | Row 0 |",
+        "| 2021-01-01 05:11:10.432       | 2021-01-01 05:11:10.432    | 
2021-01-01 05:11:10.432 | 2021-01-01 05:11:10 | Row 3 | 2021-01-01 05:11:10.432 
      | 2021-01-01 05:11:10.432    | 2021-01-01 05:11:10.432 | 2021-01-01 
05:11:10 | Row 3 |",
+        
"+-------------------------------+----------------------------+-------------------------+---------------------+-------+-------------------------------+----------------------------+-------------------------+---------------------+-------+",
+    ];
+
+    let results = execute_to_batches(
+        &mut ctx,
+        "SELECT * FROM t as t1  \
+         JOIN (SELECT * FROM t) as t2 \
+         ON t1.nanos = t2.nanos",
+    )
+    .await;
+
+    assert_batches_sorted_eq!(expected, &results);
+
+    let results = execute_to_batches(
+        &mut ctx,
+        "SELECT * FROM t as t1  \
+         JOIN (SELECT * FROM t) as t2 \
+         ON t1.micros = t2.micros",
+    )
+    .await;
+
+    assert_batches_sorted_eq!(expected, &results);
+
+    let results = execute_to_batches(
+        &mut ctx,
+        "SELECT * FROM t as t1  \
+         JOIN (SELECT * FROM t) as t2 \
+         ON t1.millis = t2.millis",
+    )
+    .await;
+
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
diff --git a/datafusion/tests/sql/mod.rs b/datafusion/tests/sql/mod.rs
index f2496c3..90fe513 100644
--- a/datafusion/tests/sql/mod.rs
+++ b/datafusion/tests/sql/mod.rs
@@ -521,8 +521,15 @@ async fn register_aggregate_csv(ctx: &mut 
ExecutionContext) -> Result<()> {
     Ok(())
 }
 
-/// Execute query and return result set as 2-d table of Vecs
-/// `result[row][column]`
+/// Execute SQL and return results as a RecordBatch
+async fn plan_and_collect(
+    ctx: &mut ExecutionContext,
+    sql: &str,
+) -> Result<Vec<RecordBatch>> {
+    ctx.sql(sql).await?.collect().await
+}
+
+/// Execute query and return results as a Vec of RecordBatches
 async fn execute_to_batches(ctx: &mut ExecutionContext, sql: &str) -> 
Vec<RecordBatch> {
     let msg = format!("Creating logical plan for '{}'", sql);
     let plan = ctx.create_logical_plan(sql).expect(&msg);
@@ -734,6 +741,93 @@ fn normalize_vec_for_explain(v: Vec<Vec<String>>) -> 
Vec<Vec<String>> {
         .collect::<Vec<_>>()
 }
 
+/// Return a new table provider containing all of the supported timestamp types
+pub fn table_with_timestamps() -> Arc<dyn TableProvider> {
+    let batch = make_timestamps();
+    let schema = batch.schema();
+    let partitions = vec![vec![batch]];
+    Arc::new(MemTable::try_new(schema, partitions).unwrap())
+}
+
+/// Return  record batch with all of the supported timestamp types
+/// values
+///
+/// Columns are named:
+/// "nanos" --> TimestampNanosecondArray
+/// "micros" --> TimestampMicrosecondArray
+/// "millis" --> TimestampMillisecondArray
+/// "secs" --> TimestampSecondArray
+/// "names" --> StringArray
+pub fn make_timestamps() -> RecordBatch {
+    let ts_strings = vec![
+        Some("2018-11-13T17:11:10.011375885995"),
+        Some("2011-12-13T11:13:10.12345"),
+        None,
+        Some("2021-1-1T05:11:10.432"),
+    ];
+
+    let ts_nanos = ts_strings
+        .into_iter()
+        .map(|t| {
+            t.map(|t| {
+                t.parse::<chrono::NaiveDateTime>()
+                    .unwrap()
+                    .timestamp_nanos()
+            })
+        })
+        .collect::<Vec<_>>();
+
+    let ts_micros = ts_nanos
+        .iter()
+        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000))
+        .collect::<Vec<_>>();
+
+    let ts_millis = ts_nanos
+        .iter()
+        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000))
+        .collect::<Vec<_>>();
+
+    let ts_secs = ts_nanos
+        .iter()
+        .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000000000))
+        .collect::<Vec<_>>();
+
+    let names = ts_nanos
+        .iter()
+        .enumerate()
+        .map(|(i, _)| format!("Row {}", i))
+        .collect::<Vec<_>>();
+
+    let arr_nanos = TimestampNanosecondArray::from_opt_vec(ts_nanos, None);
+    let arr_micros = TimestampMicrosecondArray::from_opt_vec(ts_micros, None);
+    let arr_millis = TimestampMillisecondArray::from_opt_vec(ts_millis, None);
+    let arr_secs = TimestampSecondArray::from_opt_vec(ts_secs, None);
+
+    let names = names.iter().map(|s| s.as_str()).collect::<Vec<_>>();
+    let arr_names = StringArray::from(names);
+
+    let schema = Schema::new(vec![
+        Field::new("nanos", arr_nanos.data_type().clone(), true),
+        Field::new("micros", arr_micros.data_type().clone(), true),
+        Field::new("millis", arr_millis.data_type().clone(), true),
+        Field::new("secs", arr_secs.data_type().clone(), true),
+        Field::new("name", arr_names.data_type().clone(), true),
+    ]);
+    let schema = Arc::new(schema);
+
+    RecordBatch::try_new(
+        schema,
+        vec![
+            Arc::new(arr_nanos),
+            Arc::new(arr_micros),
+            Arc::new(arr_millis),
+            Arc::new(arr_secs),
+            Arc::new(arr_names),
+        ],
+    )
+    .unwrap()
+}
+
 #[tokio::test]
 async fn nyc() -> Result<()> {
     // schema for nyxtaxi csv files

Reply via email to