This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new fa5c286e feat: support INSERT INTO SELECT (#1536)
fa5c286e is described below

commit fa5c286eefb912f2c8c27394dd76a5b497252744
Author: Draco <[email protected]>
AuthorDate: Mon Jul 15 17:15:39 2024 +0800

    feat: support INSERT INTO SELECT (#1536)
    
    ## Rationale
    
    Close  #557.
    
    ## Detailed Changes
    
    When generating the insert logical plan, alse generate the select logical 
plan and store it in the insert plan. Then execute the select logical plan in 
the insert interpreter, convert the result records into RowGroup and then 
insert it.
    
    ## Test Plan
    
    CI
---
 .../cases/env/local/dml/insert_into_select.result  |  79 +++++++
 .../cases/env/local/dml/insert_into_select.sql     |  57 +++++
 src/interpreters/src/factory.rs                    |   4 +-
 src/interpreters/src/insert.rs                     | 218 ++++++++++++++---
 src/proxy/src/write.rs                             |   4 +-
 src/query_frontend/src/plan.rs                     |  21 +-
 src/query_frontend/src/planner.rs                  | 262 ++++++++++++---------
 7 files changed, 487 insertions(+), 158 deletions(-)

diff --git a/integration_tests/cases/env/local/dml/insert_into_select.result 
b/integration_tests/cases/env/local/dml/insert_into_select.result
new file mode 100644
index 00000000..77b648a2
--- /dev/null
+++ b/integration_tests/cases/env/local/dml/insert_into_select.result
@@ -0,0 +1,79 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
+DROP TABLE IF EXISTS `insert_into_select_table1`;
+
+affected_rows: 0
+
+CREATE TABLE `insert_into_select_table1` (
+    `timestamp` timestamp NOT NULL,
+    `value` int,
+    `name` string,
+    timestamp KEY (timestamp)) ENGINE=Analytic
+WITH(
+        enable_ttl='false'
+);
+
+affected_rows: 0
+
+INSERT INTO `insert_into_select_table1` (`timestamp`, `value`, `name`)
+VALUES
+    (1, 100, "s1"),
+    (2, 200, "s2"),
+    (3, 300, "s3");
+
+affected_rows: 3
+
+DROP TABLE IF EXISTS `insert_into_select_table2`;
+
+affected_rows: 0
+
+CREATE TABLE `insert_into_select_table2` (
+    `timestamp` timestamp NOT NULL,
+    `value` int,
+    `name` string NULL,
+    timestamp KEY (timestamp)) ENGINE=Analytic
+WITH(
+        enable_ttl='false'
+);
+
+affected_rows: 0
+
+INSERT INTO `insert_into_select_table2` (`timestamp`, `value`)
+SELECT `timestamp`, `value`
+FROM `insert_into_select_table1`;
+
+affected_rows: 3
+
+SELECT `timestamp`, `value`, `name`
+FROM `insert_into_select_table2`;
+
+timestamp,value,name,
+Timestamp(1),Int32(100),String(""),
+Timestamp(2),Int32(200),String(""),
+Timestamp(3),Int32(300),String(""),
+
+
+DROP TABLE `insert_into_select_table1`;
+
+affected_rows: 0
+
+DROP TABLE `insert_into_select_table2`;
+
+affected_rows: 0
+
diff --git a/integration_tests/cases/env/local/dml/insert_into_select.sql 
b/integration_tests/cases/env/local/dml/insert_into_select.sql
new file mode 100644
index 00000000..dbe15760
--- /dev/null
+++ b/integration_tests/cases/env/local/dml/insert_into_select.sql
@@ -0,0 +1,57 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
+
+DROP TABLE IF EXISTS `insert_into_select_table1`;
+
+CREATE TABLE `insert_into_select_table1` (
+    `timestamp` timestamp NOT NULL,
+    `value` int,
+    `name` string,
+    timestamp KEY (timestamp)) ENGINE=Analytic
+WITH(
+        enable_ttl='false'
+);
+
+INSERT INTO `insert_into_select_table1` (`timestamp`, `value`, `name`)
+VALUES
+    (1, 100, "s1"),
+    (2, 200, "s2"),
+    (3, 300, "s3");
+
+DROP TABLE IF EXISTS `insert_into_select_table2`;
+
+CREATE TABLE `insert_into_select_table2` (
+    `timestamp` timestamp NOT NULL,
+    `value` int,
+    `name` string NULL,
+    timestamp KEY (timestamp)) ENGINE=Analytic
+WITH(
+        enable_ttl='false'
+);
+
+INSERT INTO `insert_into_select_table2` (`timestamp`, `value`)
+SELECT `timestamp`, `value`
+FROM `insert_into_select_table1`;
+
+SELECT `timestamp`, `value`, `name`
+FROM `insert_into_select_table2`;
+
+DROP TABLE `insert_into_select_table1`;
+
+DROP TABLE `insert_into_select_table2`;
diff --git a/src/interpreters/src/factory.rs b/src/interpreters/src/factory.rs
index 1b10661c..a47a86b2 100644
--- a/src/interpreters/src/factory.rs
+++ b/src/interpreters/src/factory.rs
@@ -82,7 +82,9 @@ impl Factory {
                 self.physical_planner,
                 self.query_runtime,
             ),
-            Plan::Insert(p) => InsertInterpreter::create(ctx, p),
+            Plan::Insert(p) => {
+                InsertInterpreter::create(ctx, p, self.query_executor, 
self.physical_planner)
+            }
             Plan::Create(p) => {
                 CreateInterpreter::create(ctx, p, self.table_engine, 
self.table_manipulator)
             }
diff --git a/src/interpreters/src/insert.rs b/src/interpreters/src/insert.rs
index cc455b3f..d6307bf1 100644
--- a/src/interpreters/src/insert.rs
+++ b/src/interpreters/src/insert.rs
@@ -30,7 +30,8 @@ use common_types::{
     column_block::{ColumnBlock, ColumnBlockBuilder},
     column_schema::ColumnId,
     datum::Datum,
-    row::RowGroup,
+    row::{Row, RowBuilder, RowGroup},
+    schema::Schema,
 };
 use datafusion::{
     common::ToDFSchema,
@@ -42,15 +43,23 @@ use datafusion::{
     },
 };
 use df_operator::visitor::find_columns_by_expr;
+use futures::TryStreamExt;
+use generic_error::{BoxError, GenericError};
 use hash_ext::hash64;
 use macros::define_result;
-use query_frontend::plan::InsertPlan;
-use snafu::{OptionExt, ResultExt, Snafu};
+use query_engine::{executor::ExecutorRef, 
physical_planner::PhysicalPlannerRef};
+use query_frontend::{
+    plan::{InsertPlan, InsertSource, QueryPlan},
+    planner::InsertMode,
+};
+use runtime::Priority;
+use snafu::{ensure, OptionExt, ResultExt, Snafu};
 use table_engine::table::{TableRef, WriteRequest};
 
 use crate::{
     context::Context,
     interpreter::{Insert, Interpreter, InterpreterPtr, Output, Result as 
InterpreterResult},
+    RecordBatchVec,
 };
 
 #[derive(Debug, Snafu)]
@@ -94,6 +103,18 @@ pub enum Error {
     BuildColumnBlock {
         source: common_types::column_block::Error,
     },
+
+    #[snafu(display("Failed to create query context, err:{}", source))]
+    CreateQueryContext { source: crate::context::Error },
+
+    #[snafu(display("Failed to execute select physical plan, msg:{}, err:{}", 
msg, source))]
+    ExecuteSelectPlan { msg: String, source: GenericError },
+
+    #[snafu(display("Failed to build row, err:{}", source))]
+    BuildRow { source: common_types::row::Error },
+
+    #[snafu(display("Record columns not enough, len:{}, index:{}", len, 
index))]
+    RecordColumnsNotEnough { len: usize, index: usize },
 }
 
 define_result!(Error);
@@ -101,11 +122,23 @@ define_result!(Error);
 pub struct InsertInterpreter {
     ctx: Context,
     plan: InsertPlan,
+    executor: ExecutorRef,
+    physical_planner: PhysicalPlannerRef,
 }
 
 impl InsertInterpreter {
-    pub fn create(ctx: Context, plan: InsertPlan) -> InterpreterPtr {
-        Box::new(Self { ctx, plan })
+    pub fn create(
+        ctx: Context,
+        plan: InsertPlan,
+        executor: ExecutorRef,
+        physical_planner: PhysicalPlannerRef,
+    ) -> InterpreterPtr {
+        Box::new(Self {
+            ctx,
+            plan,
+            executor,
+            physical_planner,
+        })
     }
 }
 
@@ -113,19 +146,42 @@ impl InsertInterpreter {
 impl Interpreter for InsertInterpreter {
     async fn execute(mut self: Box<Self>) -> InterpreterResult<Output> {
         // Generate tsid if needed.
-        self.maybe_generate_tsid().context(Insert)?;
         let InsertPlan {
             table,
-            mut rows,
+            source,
             default_value_map,
         } = self.plan;
 
+        let mut rows = match source {
+            InsertSource::Values { row_group } => row_group,
+            InsertSource::Select {
+                query: query_plan,
+                column_index_in_insert,
+            } => {
+                // TODO: support streaming insert
+                let record_batches = exec_select_logical_plan(
+                    self.ctx,
+                    query_plan,
+                    self.executor,
+                    self.physical_planner,
+                )
+                .await
+                .context(Insert)?;
+
+                if record_batches.is_empty() {
+                    return Ok(Output::AffectedRows(0));
+                }
+
+                convert_records_to_row_group(record_batches, 
column_index_in_insert, table.schema())
+                    .context(Insert)?
+            }
+        };
+
+        maybe_generate_tsid(&mut rows).context(Insert)?;
+
         // Fill default values
         fill_default_values(table.clone(), &mut rows, 
&default_value_map).context(Insert)?;
 
-        // Context is unused now
-        let _ctx = self.ctx;
-
         let request = WriteRequest { row_group: rows };
 
         let num_rows = table
@@ -138,42 +194,128 @@ impl Interpreter for InsertInterpreter {
     }
 }
 
-impl InsertInterpreter {
-    fn maybe_generate_tsid(&mut self) -> Result<()> {
-        let schema = self.plan.rows.schema();
-        let tsid_idx = schema.index_of_tsid();
-
-        if let Some(idx) = tsid_idx {
-            // Vec of (`index of tag`, `column id of tag`).
-            let tag_idx_column_ids: Vec<_> = schema
-                .columns()
-                .iter()
-                .enumerate()
-                .filter_map(|(i, column)| {
-                    if column.is_tag {
-                        Some((i, column.id))
-                    } else {
-                        None
-                    }
-                })
-                .collect();
+async fn exec_select_logical_plan(
+    ctx: Context,
+    query_plan: QueryPlan,
+    executor: ExecutorRef,
+    physical_planner: PhysicalPlannerRef,
+) -> Result<RecordBatchVec> {
+    let priority = Priority::High;
+
+    let query_ctx = ctx
+        .new_query_context(priority)
+        .context(CreateQueryContext)?;
+
+    // Create select physical plan.
+    let physical_plan = physical_planner
+        .plan(&query_ctx, query_plan)
+        .await
+        .box_err()
+        .context(ExecuteSelectPlan {
+            msg: "failed to build select physical plan",
+        })?;
 
-            let mut hash_bytes = Vec::new();
-            for i in 0..self.plan.rows.num_rows() {
-                let row = self.plan.rows.get_row_mut(i).unwrap();
+    // Execute select physical plan.
+    let record_batch_stream = executor
+        .execute(&query_ctx, physical_plan)
+        .await
+        .box_err()
+        .context(ExecuteSelectPlan {
+            msg: "failed to execute select physical plan",
+        })?;
 
-                let mut tsid_builder = TsidBuilder::new(&mut hash_bytes);
+    let record_batches =
+        record_batch_stream
+            .try_collect()
+            .await
+            .box_err()
+            .context(ExecuteSelectPlan {
+                msg: "failed to collect select execution results",
+            })?;
 
-                for (idx, column_id) in &tag_idx_column_ids {
-                    tsid_builder.maybe_write_datum(*column_id, &row[*idx])?;
+    Ok(record_batches)
+}
+
+fn convert_records_to_row_group(
+    record_batches: RecordBatchVec,
+    column_index_in_insert: Vec<InsertMode>,
+    schema: Schema,
+) -> Result<RowGroup> {
+    let mut data_rows: Vec<Row> = Vec::new();
+
+    for record in &record_batches {
+        let num_cols = record.num_columns();
+        let num_rows = record.num_rows();
+        for row_idx in 0..num_rows {
+            let mut row_builder = RowBuilder::new(&schema);
+            // For each column in schema, append datum into row builder
+            for (index_opt, column_schema) in 
column_index_in_insert.iter().zip(schema.columns()) {
+                match index_opt {
+                    InsertMode::Direct(index) => {
+                        ensure!(
+                            *index < num_cols,
+                            RecordColumnsNotEnough {
+                                len: num_cols,
+                                index: *index
+                            }
+                        );
+                        let datum = record.column(*index).datum(row_idx);
+                        row_builder = 
row_builder.append_datum(datum).context(BuildRow)?;
+                    }
+                    InsertMode::Null => {
+                        // This is a null column
+                        row_builder = 
row_builder.append_datum(Datum::Null).context(BuildRow)?;
+                    }
+                    InsertMode::Auto => {
+                        // This is an auto generated column, fill by default 
value.
+                        let kind = &column_schema.data_type;
+                        row_builder = row_builder
+                            .append_datum(Datum::empty(kind))
+                            .context(BuildRow)?;
+                    }
                 }
+            }
+            let row = row_builder.finish().context(BuildRow)?;
+            data_rows.push(row);
+        }
+    }
+    RowGroup::try_new(schema, data_rows).context(BuildRow)
+}
+
+fn maybe_generate_tsid(rows: &mut RowGroup) -> Result<()> {
+    let schema = rows.schema();
+    let tsid_idx = schema.index_of_tsid();
 
-                let tsid = tsid_builder.finish();
-                row[idx] = Datum::UInt64(tsid);
+    if let Some(idx) = tsid_idx {
+        // Vec of (`index of tag`, `column id of tag`).
+        let tag_idx_column_ids: Vec<_> = schema
+            .columns()
+            .iter()
+            .enumerate()
+            .filter_map(|(i, column)| {
+                if column.is_tag {
+                    Some((i, column.id))
+                } else {
+                    None
+                }
+            })
+            .collect();
+
+        let mut hash_bytes = Vec::new();
+        for i in 0..rows.num_rows() {
+            let row = rows.get_row_mut(i).unwrap();
+
+            let mut tsid_builder = TsidBuilder::new(&mut hash_bytes);
+
+            for (idx, column_id) in &tag_idx_column_ids {
+                tsid_builder.maybe_write_datum(*column_id, &row[*idx])?;
             }
+
+            let tsid = tsid_builder.finish();
+            row[idx] = Datum::UInt64(tsid);
         }
-        Ok(())
     }
+    Ok(())
 }
 
 struct TsidBuilder<'a> {
diff --git a/src/proxy/src/write.rs b/src/proxy/src/write.rs
index fd8e54bd..0fcd082a 100644
--- a/src/proxy/src/write.rs
+++ b/src/proxy/src/write.rs
@@ -44,7 +44,7 @@ use interpreters::interpreter::Output;
 use logger::{debug, error, info, warn};
 use query_frontend::{
     frontend::{Context as FrontendContext, Frontend},
-    plan::{AlterTableOperation, AlterTablePlan, InsertPlan, Plan},
+    plan::{AlterTableOperation, AlterTablePlan, InsertPlan, InsertSource, 
Plan},
     planner::{build_column_schema, try_get_data_type_from_value},
     provider::CatalogMetaProvider,
 };
@@ -861,7 +861,7 @@ fn write_table_request_to_insert_plan(
         })?;
     Ok(InsertPlan {
         table,
-        rows: row_group,
+        source: InsertSource::Values { row_group },
         default_value_map: BTreeMap::new(),
     })
 }
diff --git a/src/query_frontend/src/plan.rs b/src/query_frontend/src/plan.rs
index e5db6238..400d01e9 100644
--- a/src/query_frontend/src/plan.rs
+++ b/src/query_frontend/src/plan.rs
@@ -39,7 +39,11 @@ use runtime::Priority;
 use snafu::{OptionExt, Snafu};
 use table_engine::{partition::PartitionInfo, table::TableRef};
 
-use crate::{ast::ShowCreateObject, container::TableContainer, 
planner::get_table_ref};
+use crate::{
+    ast::ShowCreateObject,
+    container::TableContainer,
+    planner::{get_table_ref, InsertMode},
+};
 
 #[derive(Debug, Snafu)]
 pub enum Error {
@@ -305,13 +309,24 @@ pub struct DropTablePlan {
     pub partition_info: Option<PartitionInfo>,
 }
 
+#[derive(Debug)]
+pub enum InsertSource {
+    Values {
+        row_group: RowGroup,
+    },
+    Select {
+        column_index_in_insert: Vec<InsertMode>,
+        query: QueryPlan,
+    },
+}
+
 /// Insert logical plan
 #[derive(Debug)]
 pub struct InsertPlan {
     /// The table to insert
     pub table: TableRef,
-    /// RowGroup to insert
-    pub rows: RowGroup,
+    /// Insert source(could be value literals or select query)
+    pub source: InsertSource,
     /// Column indexes in schema to its default-value-expr which is used to 
fill
     /// values
     pub default_value_map: BTreeMap<usize, DfLogicalExpr>,
diff --git a/src/query_frontend/src/planner.rs 
b/src/query_frontend/src/planner.rs
index b5d0fb50..cec4f75e 100644
--- a/src/query_frontend/src/planner.rs
+++ b/src/query_frontend/src/planner.rs
@@ -80,12 +80,13 @@ use crate::{
     partition::PartitionParser,
     plan::{
         AlterTableOperation, AlterTablePlan, CreateTablePlan, 
DescribeTablePlan, DropTablePlan,
-        ExistsTablePlan, InsertPlan, Plan, QueryPlan, QueryType, 
ShowCreatePlan, ShowPlan,
-        ShowTablesPlan,
+        ExistsTablePlan, InsertPlan, InsertSource, Plan, QueryPlan, QueryType, 
ShowCreatePlan,
+        ShowPlan, ShowTablesPlan,
     },
     promql::{remote_query_to_plan, ColumnNames, Expr as PromExpr, 
RemoteQueryPlan},
     provider::{ContextProviderAdapter, MetaProvider},
 };
+
 // We do not carry backtrace in sql error because it is mainly used in server
 // handler and the error is usually caused by invalid/unsupported sql, which
 // should be easy to find out the reason.
@@ -905,7 +906,7 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> {
     }
 
     // REQUIRE: SqlStatement must be INSERT stmt
-    fn insert_to_plan(&self, sql_stmt: SqlStatement) -> Result<Plan> {
+    fn insert_to_plan(self, sql_stmt: SqlStatement) -> Result<Plan> {
         match sql_stmt {
             SqlStatement::Insert {
                 table_name,
@@ -996,11 +997,16 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> {
                     }
                 }
 
-                let rows = build_row_group(schema, source, 
column_index_in_insert)?;
+                let source = build_insert_source(
+                    schema,
+                    source,
+                    column_index_in_insert,
+                    self.meta_provider,
+                )?;
 
                 Ok(Plan::Insert(InsertPlan {
                     table,
-                    rows,
+                    source,
                     default_value_map,
                 }))
             }
@@ -1105,7 +1111,7 @@ fn normalize_func_name(sql_stmt: &mut SqlStatement) {
 }
 
 #[derive(Debug)]
-enum InsertMode {
+pub enum InsertMode {
     // Insert the value in expr with given index directly.
     Direct(usize),
     // No value provided, insert a null.
@@ -1154,12 +1160,13 @@ fn parse_data_value_from_expr(data_type: DatumKind, 
expr: &mut Expr) -> Result<D
     }
 }
 
-/// Build RowGroup
-fn build_row_group(
+/// Build InsertSource
+fn build_insert_source<P: MetaProvider>(
     schema: Schema,
     source: Box<Query>,
     column_index_in_insert: Vec<InsertMode>,
-) -> Result<RowGroup> {
+    meta_provider: ContextProviderAdapter<P>,
+) -> Result<InsertSource> {
     // Build row group by schema
     match *source.body {
         SetExpr::Values(Values {
@@ -1207,7 +1214,33 @@ fn build_row_group(
             }
 
             // Build the whole row group
-            Ok(RowGroup::new_unchecked(schema, rows))
+            Ok(InsertSource::Values {
+                row_group: RowGroup::new_unchecked(schema, rows),
+            })
+        }
+        SetExpr::Select(..) => {
+            let mut select_stmt = SqlStatement::Query(source);
+            normalize_func_name(&mut select_stmt);
+
+            let df_planner = SqlToRel::new_with_options(&meta_provider, 
DEFAULT_PARSER_OPTS);
+            let select_table_name = 
parse_table_name_with_standard(&select_stmt);
+
+            let select_df_plan = df_planner
+                .sql_statement_to_plan(select_stmt)
+                .context(DatafusionPlan)?;
+            let select_df_plan = 
optimize_plan(&select_df_plan).context(DatafusionPlan)?;
+
+            // Get all tables needed in the plan
+            let tables = meta_provider.try_into_container().context(FindMeta)?;
+            let query = QueryPlan {
+                df_plan: select_df_plan,
+                table_name: select_table_name,
+                tables: Arc::new(tables),
+            };
+            Ok(InsertSource::Select {
+                query,
+                column_index_in_insert,
+            })
         }
         _ => InsertSourceBodyNotSet.fail(),
     }
@@ -1409,7 +1442,6 @@ pub fn get_table_ref(table_name: &str) -> TableReference {
 
 #[cfg(test)]
 pub mod tests {
-
     use datafusion::{
         common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion},
         datasource::source_as_provider,
@@ -1809,114 +1841,116 @@ pub mod tests {
                 ],
             },
         },
-        rows: RowGroup {
-            schema: Schema {
-                timestamp_index: 1,
-                tsid_index: None,
-                column_schemas: ColumnSchemas {
-                    columns: [
-                        ColumnSchema {
-                            id: 1,
-                            name: "key1",
-                            data_type: Varbinary,
-                            is_nullable: false,
-                            is_tag: false,
-                            is_dictionary: false,
-                            comment: "",
-                            escaped_name: "key1",
-                            default_value: None,
-                        },
-                        ColumnSchema {
-                            id: 2,
-                            name: "key2",
-                            data_type: Timestamp,
-                            is_nullable: false,
-                            is_tag: false,
-                            is_dictionary: false,
-                            comment: "",
-                            escaped_name: "key2",
-                            default_value: None,
-                        },
-                        ColumnSchema {
-                            id: 3,
-                            name: "field1",
-                            data_type: Double,
-                            is_nullable: true,
-                            is_tag: false,
-                            is_dictionary: false,
-                            comment: "",
-                            escaped_name: "field1",
-                            default_value: None,
-                        },
-                        ColumnSchema {
-                            id: 4,
-                            name: "field2",
-                            data_type: String,
-                            is_nullable: true,
-                            is_tag: false,
-                            is_dictionary: false,
-                            comment: "",
-                            escaped_name: "field2",
-                            default_value: None,
-                        },
-                        ColumnSchema {
-                            id: 5,
-                            name: "field3",
-                            data_type: Date,
-                            is_nullable: true,
-                            is_tag: false,
-                            is_dictionary: false,
-                            comment: "",
-                            escaped_name: "field3",
-                            default_value: None,
-                        },
-                        ColumnSchema {
-                            id: 6,
-                            name: "field4",
-                            data_type: Time,
-                            is_nullable: true,
-                            is_tag: false,
-                            is_dictionary: false,
-                            comment: "",
-                            escaped_name: "field4",
-                            default_value: None,
-                        },
+        source: Values {
+            row_group: RowGroup {
+                schema: Schema {
+                    timestamp_index: 1,
+                    tsid_index: None,
+                    column_schemas: ColumnSchemas {
+                        columns: [
+                            ColumnSchema {
+                                id: 1,
+                                name: "key1",
+                                data_type: Varbinary,
+                                is_nullable: false,
+                                is_tag: false,
+                                is_dictionary: false,
+                                comment: "",
+                                escaped_name: "key1",
+                                default_value: None,
+                            },
+                            ColumnSchema {
+                                id: 2,
+                                name: "key2",
+                                data_type: Timestamp,
+                                is_nullable: false,
+                                is_tag: false,
+                                is_dictionary: false,
+                                comment: "",
+                                escaped_name: "key2",
+                                default_value: None,
+                            },
+                            ColumnSchema {
+                                id: 3,
+                                name: "field1",
+                                data_type: Double,
+                                is_nullable: true,
+                                is_tag: false,
+                                is_dictionary: false,
+                                comment: "",
+                                escaped_name: "field1",
+                                default_value: None,
+                            },
+                            ColumnSchema {
+                                id: 4,
+                                name: "field2",
+                                data_type: String,
+                                is_nullable: true,
+                                is_tag: false,
+                                is_dictionary: false,
+                                comment: "",
+                                escaped_name: "field2",
+                                default_value: None,
+                            },
+                            ColumnSchema {
+                                id: 5,
+                                name: "field3",
+                                data_type: Date,
+                                is_nullable: true,
+                                is_tag: false,
+                                is_dictionary: false,
+                                comment: "",
+                                escaped_name: "field3",
+                                default_value: None,
+                            },
+                            ColumnSchema {
+                                id: 6,
+                                name: "field4",
+                                data_type: Time,
+                                is_nullable: true,
+                                is_tag: false,
+                                is_dictionary: false,
+                                comment: "",
+                                escaped_name: "field4",
+                                default_value: None,
+                            },
+                        ],
+                    },
+                    version: 1,
+                    primary_key_indexes: [
+                        0,
+                        1,
                     ],
                 },
-                version: 1,
-                primary_key_indexes: [
-                    0,
-                    1,
-                ],
-            },
-            rows: [
-                Row {
-                    cols: [
-                        Varbinary(
-                            b"tagk",
-                        ),
-                        Timestamp(
+                rows: [
+                    Row {
+                        cols: [
+                            Varbinary(
+                                b"tagk",
+                            ),
                             Timestamp(
-                                1638428434000,
+                                Timestamp(
+                                    1638428434000,
+                                ),
                             ),
-                        ),
-                        Double(
-                            100.0,
-                        ),
-                        String(
-                            StringBytes(
-                                b"hello3",
+                            Double(
+                                100.0,
                             ),
-                        ),
-                        Date(
-                            19275,
-                        ),
-                        Time(
-                            43200456000000,
-                        ),
-                    ],
-                },
-            ],
+                            String(
+                                StringBytes(
+                                    b"hello3",
+                                ),
+                            ),
+                            Date(
+                                19275,
+                            ),
+                            Time(
+                                43200456000000,
+                            ),
+                        ],
+                    },
+                ],
+            },
         },
         default_value_map: {},
     },


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to