This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new c5825cc7 refactor: insert select to stream mode (#1544)
c5825cc7 is described below

commit c5825cc716eca1fc66db8c4faca9a7549a3f119e
Author: MianChen <[email protected]>
AuthorDate: Thu Jul 18 01:40:30 2024 -0500

    refactor: insert select to stream mode (#1544)
    
    ## Rationale
    Close #1542
    
    ## Detailed Changes
    Do select and insert procedure in stream way.
    
    ## Test Plan
    CI test.
    
    ---------
    
    Co-authored-by: jiacai2050 <[email protected]>
---
 .../cases/env/local/dml/insert_into_select.result  |  10 +-
 .../cases/env/local/dml/insert_into_select.sql     |   4 +-
 src/interpreters/Cargo.toml                        |   1 +
 src/interpreters/src/insert.rs                     | 170 ++++++++++++++++-----
 4 files changed, 143 insertions(+), 42 deletions(-)

diff --git a/integration_tests/cases/env/local/dml/insert_into_select.result 
b/integration_tests/cases/env/local/dml/insert_into_select.result
index 77b648a2..93fc8256 100644
--- a/integration_tests/cases/env/local/dml/insert_into_select.result
+++ b/integration_tests/cases/env/local/dml/insert_into_select.result
@@ -35,9 +35,11 @@ INSERT INTO `insert_into_select_table1` (`timestamp`, 
`value`, `name`)
 VALUES
     (1, 100, "s1"),
     (2, 200, "s2"),
-    (3, 300, "s3");
+    (3, 300, "s3"),
+    (4, 400, "s4"),
+    (5, 500, "s5");
 
-affected_rows: 3
+affected_rows: 5
 
 DROP TABLE IF EXISTS `insert_into_select_table2`;
 
@@ -58,7 +60,7 @@ INSERT INTO `insert_into_select_table2` (`timestamp`, `value`)
 SELECT `timestamp`, `value`
 FROM `insert_into_select_table1`;
 
-affected_rows: 3
+affected_rows: 5
 
 SELECT `timestamp`, `value`, `name`
 FROM `insert_into_select_table2`;
@@ -67,6 +69,8 @@ timestamp,value,name,
 Timestamp(1),Int32(100),String(""),
 Timestamp(2),Int32(200),String(""),
 Timestamp(3),Int32(300),String(""),
+Timestamp(4),Int32(400),String(""),
+Timestamp(5),Int32(500),String(""),
 
 
 DROP TABLE `insert_into_select_table1`;
diff --git a/integration_tests/cases/env/local/dml/insert_into_select.sql 
b/integration_tests/cases/env/local/dml/insert_into_select.sql
index dbe15760..1a0d4a1d 100644
--- a/integration_tests/cases/env/local/dml/insert_into_select.sql
+++ b/integration_tests/cases/env/local/dml/insert_into_select.sql
@@ -32,7 +32,9 @@ INSERT INTO `insert_into_select_table1` (`timestamp`, 
`value`, `name`)
 VALUES
     (1, 100, "s1"),
     (2, 200, "s2"),
-    (3, 300, "s3");
+    (3, 300, "s3"),
+    (4, 400, "s4"),
+    (5, 500, "s5");
 
 DROP TABLE IF EXISTS `insert_into_select_table2`;
 
diff --git a/src/interpreters/Cargo.toml b/src/interpreters/Cargo.toml
index 6f1513fb..d237d5b8 100644
--- a/src/interpreters/Cargo.toml
+++ b/src/interpreters/Cargo.toml
@@ -54,6 +54,7 @@ regex = { workspace = true }
 runtime = { workspace = true }
 snafu = { workspace = true }
 table_engine = { workspace = true }
+tokio = { workspace = true }
 
 [dev-dependencies]
 analytic_engine = { workspace = true, features = ["test"] }
diff --git a/src/interpreters/src/insert.rs b/src/interpreters/src/insert.rs
index d6307bf1..5d9e254f 100644
--- a/src/interpreters/src/insert.rs
+++ b/src/interpreters/src/insert.rs
@@ -30,6 +30,7 @@ use common_types::{
     column_block::{ColumnBlock, ColumnBlockBuilder},
     column_schema::ColumnId,
     datum::Datum,
+    record_batch::RecordBatch as CommonRecordBatch,
     row::{Row, RowBuilder, RowGroup},
     schema::Schema,
 };
@@ -54,12 +55,15 @@ use query_frontend::{
 };
 use runtime::Priority;
 use snafu::{ensure, OptionExt, ResultExt, Snafu};
-use table_engine::table::{TableRef, WriteRequest};
+use table_engine::{
+    stream::SendableRecordBatchStream,
+    table::{TableRef, WriteRequest},
+};
+use tokio::sync::mpsc;
 
 use crate::{
     context::Context,
     interpreter::{Insert, Interpreter, InterpreterPtr, Output, Result as 
InterpreterResult},
-    RecordBatchVec,
 };
 
 #[derive(Debug, Snafu)]
@@ -115,10 +119,23 @@ pub enum Error {
 
     #[snafu(display("Record columns not enough, len:{}, index:{}", len, 
index))]
     RecordColumnsNotEnough { len: usize, index: usize },
+
+    #[snafu(display("Failed to do select, err:{}", source))]
+    Select { source: table_engine::stream::Error },
+
+    #[snafu(display("Failed to send msg in channel, err:{}", msg))]
+    MsgChannel { msg: String },
+
+    #[snafu(display("Failed to join async task, err:{}", msg))]
+    AsyncTask { msg: String },
 }
 
 define_result!(Error);
 
+// TODO: make those configurable
+const INSERT_SELECT_ROW_BATCH_NUM: usize = 1000;
+const INSERT_SELECT_PENDING_BATCH_NUM: usize = 3;
+
 pub struct InsertInterpreter {
     ctx: Context,
     plan: InsertPlan,
@@ -152,14 +169,18 @@ impl Interpreter for InsertInterpreter {
             default_value_map,
         } = self.plan;
 
-        let mut rows = match source {
-            InsertSource::Values { row_group } => row_group,
+        match source {
+            InsertSource::Values { row_group: rows } => {
+                let num_rows =
+                    prepare_and_write_table(table.clone(), rows, 
&default_value_map).await?;
+
+                Ok(Output::AffectedRows(num_rows))
+            }
             InsertSource::Select {
                 query: query_plan,
                 column_index_in_insert,
             } => {
-                // TODO: support streaming insert
-                let record_batches = exec_select_logical_plan(
+                let mut record_batches_stream = exec_select_logical_plan(
                     self.ctx,
                     query_plan,
                     self.executor,
@@ -168,30 +189,112 @@ impl Interpreter for InsertInterpreter {
                 .await
                 .context(Insert)?;
 
-                if record_batches.is_empty() {
-                    return Ok(Output::AffectedRows(0));
-                }
+                let (tx, rx) = mpsc::channel(INSERT_SELECT_PENDING_BATCH_NUM);
+                let producer = tokio::spawn(async move {
+                    while let Some(record_batch) = record_batches_stream
+                        .try_next()
+                        .await
+                        .context(Select)
+                        .context(Insert)?
+                    {
+                        if record_batch.is_empty() {
+                            continue;
+                        }
+                        if let Err(e) = tx.send(record_batch).await {
+                            return Err(Error::MsgChannel {
+                                msg: format!("{}", e),
+                            })
+                            .context(Insert)?;
+                        }
+                    }
+                    Ok(())
+                });
+
+                let consumer = tokio::spawn(async move {
+                    let mut rx = rx;
+                    let mut result_rows = 0;
+                    let mut pending_rows = 0;
+                    let mut record_batches = Vec::new();
+                    while let Some(record_batch) = rx.recv().await {
+                        pending_rows += record_batch.num_rows();
+                        record_batches.push(record_batch);
+                        if pending_rows >= INSERT_SELECT_ROW_BATCH_NUM {
+                            pending_rows = 0;
+                            let num_rows = write_record_batches(
+                                &mut record_batches,
+                                column_index_in_insert.as_slice(),
+                                table.clone(),
+                                &default_value_map,
+                            )
+                            .await?;
+                            result_rows += num_rows;
+                        }
+                    }
 
-                convert_records_to_row_group(record_batches, 
column_index_in_insert, table.schema())
-                    .context(Insert)?
+                    if !record_batches.is_empty() {
+                        let num_rows = write_record_batches(
+                            &mut record_batches,
+                            column_index_in_insert.as_slice(),
+                            table,
+                            &default_value_map,
+                        )
+                        .await?;
+                        result_rows += num_rows;
+                    }
+                    Ok(result_rows)
+                });
+
+                match tokio::try_join!(producer, consumer) {
+                    Ok((select_res, write_rows)) => {
+                        select_res?;
+                        Ok(Output::AffectedRows(write_rows?))
+                    }
+                    Err(e) => Err(Error::AsyncTask {
+                        msg: format!("{}", e),
+                    })
+                    .context(Insert)?,
+                }
             }
-        };
+        }
+    }
+}
 
-        maybe_generate_tsid(&mut rows).context(Insert)?;
+async fn write_record_batches(
+    record_batches: &mut Vec<CommonRecordBatch>,
+    column_index_in_insert: &[InsertMode],
+    table: TableRef,
+    default_value_map: &BTreeMap<usize, DfLogicalExpr>,
+) -> InterpreterResult<usize> {
+    let row_group = convert_records_to_row_group(
+        record_batches.as_slice(),
+        column_index_in_insert,
+        table.schema(),
+    )
+    .context(Insert)?;
+    record_batches.clear();
+
+    prepare_and_write_table(table, row_group, default_value_map).await
+}
 
-        // Fill default values
-        fill_default_values(table.clone(), &mut rows, 
&default_value_map).context(Insert)?;
+async fn prepare_and_write_table(
+    table: TableRef,
+    mut row_group: RowGroup,
+    default_value_map: &BTreeMap<usize, DfLogicalExpr>,
+) -> InterpreterResult<usize> {
+    maybe_generate_tsid(&mut row_group).context(Insert)?;
 
-        let request = WriteRequest { row_group: rows };
+    // Fill default values
+    fill_default_values(table.clone(), &mut row_group, 
default_value_map).context(Insert)?;
 
-        let num_rows = table
-            .write(request)
-            .await
-            .context(WriteTable)
-            .context(Insert)?;
+    let request = WriteRequest { row_group };
 
-        Ok(Output::AffectedRows(num_rows))
-    }
+    let num_rows = table
+        .write(request)
+        .await
+        .context(WriteTable)
+        .context(Insert)?;
+
+    Ok(num_rows)
 }
 
 async fn exec_select_logical_plan(
@@ -199,7 +302,7 @@ async fn exec_select_logical_plan(
     query_plan: QueryPlan,
     executor: ExecutorRef,
     physical_planner: PhysicalPlannerRef,
-) -> Result<RecordBatchVec> {
+) -> Result<SendableRecordBatchStream> {
     let priority = Priority::High;
 
     let query_ctx = ctx
@@ -216,7 +319,7 @@ async fn exec_select_logical_plan(
         })?;
 
     // Execute select physical plan.
-    let record_batch_stream = executor
+    let record_batch_stream: SendableRecordBatchStream = executor
         .execute(&query_ctx, physical_plan)
         .await
         .box_err()
@@ -224,26 +327,17 @@ async fn exec_select_logical_plan(
             msg: "failed to execute select physical plan",
         })?;
 
-    let record_batches =
-        record_batch_stream
-            .try_collect()
-            .await
-            .box_err()
-            .context(ExecuteSelectPlan {
-                msg: "failed to collect select execution results",
-            })?;
-
-    Ok(record_batches)
+    Ok(record_batch_stream)
 }
 
 fn convert_records_to_row_group(
-    record_batches: RecordBatchVec,
-    column_index_in_insert: Vec<InsertMode>,
+    record_batches: &[CommonRecordBatch],
+    column_index_in_insert: &[InsertMode],
     schema: Schema,
 ) -> Result<RowGroup> {
     let mut data_rows: Vec<Row> = Vec::new();
 
-    for record in &record_batches {
+    for record in record_batches {
         let num_cols = record.num_columns();
         let num_rows = record.num_rows();
         for row_idx in 0..num_rows {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to