This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new c5825cc7 refactor: insert select to stream mode (#1544)
c5825cc7 is described below
commit c5825cc716eca1fc66db8c4faca9a7549a3f119e
Author: MianChen <[email protected]>
AuthorDate: Thu Jul 18 01:40:30 2024 -0500
refactor: insert select to stream mode (#1544)
## Rationale
Close #1542
## Detailed Changes
Do select and insert procedure in stream way.
## Test Plan
CI test.
---------
Co-authored-by: jiacai2050 <[email protected]>
---
.../cases/env/local/dml/insert_into_select.result | 10 +-
.../cases/env/local/dml/insert_into_select.sql | 4 +-
src/interpreters/Cargo.toml | 1 +
src/interpreters/src/insert.rs | 170 ++++++++++++++++-----
4 files changed, 143 insertions(+), 42 deletions(-)
diff --git a/integration_tests/cases/env/local/dml/insert_into_select.result
b/integration_tests/cases/env/local/dml/insert_into_select.result
index 77b648a2..93fc8256 100644
--- a/integration_tests/cases/env/local/dml/insert_into_select.result
+++ b/integration_tests/cases/env/local/dml/insert_into_select.result
@@ -35,9 +35,11 @@ INSERT INTO `insert_into_select_table1` (`timestamp`,
`value`, `name`)
VALUES
(1, 100, "s1"),
(2, 200, "s2"),
- (3, 300, "s3");
+ (3, 300, "s3"),
+ (4, 400, "s4"),
+ (5, 500, "s5");
-affected_rows: 3
+affected_rows: 5
DROP TABLE IF EXISTS `insert_into_select_table2`;
@@ -58,7 +60,7 @@ INSERT INTO `insert_into_select_table2` (`timestamp`, `value`)
SELECT `timestamp`, `value`
FROM `insert_into_select_table1`;
-affected_rows: 3
+affected_rows: 5
SELECT `timestamp`, `value`, `name`
FROM `insert_into_select_table2`;
@@ -67,6 +69,8 @@ timestamp,value,name,
Timestamp(1),Int32(100),String(""),
Timestamp(2),Int32(200),String(""),
Timestamp(3),Int32(300),String(""),
+Timestamp(4),Int32(400),String(""),
+Timestamp(5),Int32(500),String(""),
DROP TABLE `insert_into_select_table1`;
diff --git a/integration_tests/cases/env/local/dml/insert_into_select.sql
b/integration_tests/cases/env/local/dml/insert_into_select.sql
index dbe15760..1a0d4a1d 100644
--- a/integration_tests/cases/env/local/dml/insert_into_select.sql
+++ b/integration_tests/cases/env/local/dml/insert_into_select.sql
@@ -32,7 +32,9 @@ INSERT INTO `insert_into_select_table1` (`timestamp`,
`value`, `name`)
VALUES
(1, 100, "s1"),
(2, 200, "s2"),
- (3, 300, "s3");
+ (3, 300, "s3"),
+ (4, 400, "s4"),
+ (5, 500, "s5");
DROP TABLE IF EXISTS `insert_into_select_table2`;
diff --git a/src/interpreters/Cargo.toml b/src/interpreters/Cargo.toml
index 6f1513fb..d237d5b8 100644
--- a/src/interpreters/Cargo.toml
+++ b/src/interpreters/Cargo.toml
@@ -54,6 +54,7 @@ regex = { workspace = true }
runtime = { workspace = true }
snafu = { workspace = true }
table_engine = { workspace = true }
+tokio = { workspace = true }
[dev-dependencies]
analytic_engine = { workspace = true, features = ["test"] }
diff --git a/src/interpreters/src/insert.rs b/src/interpreters/src/insert.rs
index d6307bf1..5d9e254f 100644
--- a/src/interpreters/src/insert.rs
+++ b/src/interpreters/src/insert.rs
@@ -30,6 +30,7 @@ use common_types::{
column_block::{ColumnBlock, ColumnBlockBuilder},
column_schema::ColumnId,
datum::Datum,
+ record_batch::RecordBatch as CommonRecordBatch,
row::{Row, RowBuilder, RowGroup},
schema::Schema,
};
@@ -54,12 +55,15 @@ use query_frontend::{
};
use runtime::Priority;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
-use table_engine::table::{TableRef, WriteRequest};
+use table_engine::{
+ stream::SendableRecordBatchStream,
+ table::{TableRef, WriteRequest},
+};
+use tokio::sync::mpsc;
use crate::{
context::Context,
interpreter::{Insert, Interpreter, InterpreterPtr, Output, Result as
InterpreterResult},
- RecordBatchVec,
};
#[derive(Debug, Snafu)]
@@ -115,10 +119,23 @@ pub enum Error {
#[snafu(display("Record columns not enough, len:{}, index:{}", len,
index))]
RecordColumnsNotEnough { len: usize, index: usize },
+
+ #[snafu(display("Failed to do select, err:{}", source))]
+ Select { source: table_engine::stream::Error },
+
+ #[snafu(display("Failed to send msg in channel, err:{}", msg))]
+ MsgChannel { msg: String },
+
+ #[snafu(display("Failed to join async task, err:{}", msg))]
+ AsyncTask { msg: String },
}
define_result!(Error);
+// TODO: make those configurable
+const INSERT_SELECT_ROW_BATCH_NUM: usize = 1000;
+const INSERT_SELECT_PENDING_BATCH_NUM: usize = 3;
+
pub struct InsertInterpreter {
ctx: Context,
plan: InsertPlan,
@@ -152,14 +169,18 @@ impl Interpreter for InsertInterpreter {
default_value_map,
} = self.plan;
- let mut rows = match source {
- InsertSource::Values { row_group } => row_group,
+ match source {
+ InsertSource::Values { row_group: rows } => {
+ let num_rows =
+ prepare_and_write_table(table.clone(), rows,
&default_value_map).await?;
+
+ Ok(Output::AffectedRows(num_rows))
+ }
InsertSource::Select {
query: query_plan,
column_index_in_insert,
} => {
- // TODO: support streaming insert
- let record_batches = exec_select_logical_plan(
+ let mut record_batches_stream = exec_select_logical_plan(
self.ctx,
query_plan,
self.executor,
@@ -168,30 +189,112 @@ impl Interpreter for InsertInterpreter {
.await
.context(Insert)?;
- if record_batches.is_empty() {
- return Ok(Output::AffectedRows(0));
- }
+ let (tx, rx) = mpsc::channel(INSERT_SELECT_PENDING_BATCH_NUM);
+ let producer = tokio::spawn(async move {
+ while let Some(record_batch) = record_batches_stream
+ .try_next()
+ .await
+ .context(Select)
+ .context(Insert)?
+ {
+ if record_batch.is_empty() {
+ continue;
+ }
+ if let Err(e) = tx.send(record_batch).await {
+ return Err(Error::MsgChannel {
+ msg: format!("{}", e),
+ })
+ .context(Insert)?;
+ }
+ }
+ Ok(())
+ });
+
+ let consumer = tokio::spawn(async move {
+ let mut rx = rx;
+ let mut result_rows = 0;
+ let mut pending_rows = 0;
+ let mut record_batches = Vec::new();
+ while let Some(record_batch) = rx.recv().await {
+ pending_rows += record_batch.num_rows();
+ record_batches.push(record_batch);
+ if pending_rows >= INSERT_SELECT_ROW_BATCH_NUM {
+ pending_rows = 0;
+ let num_rows = write_record_batches(
+ &mut record_batches,
+ column_index_in_insert.as_slice(),
+ table.clone(),
+ &default_value_map,
+ )
+ .await?;
+ result_rows += num_rows;
+ }
+ }
- convert_records_to_row_group(record_batches,
column_index_in_insert, table.schema())
- .context(Insert)?
+ if !record_batches.is_empty() {
+ let num_rows = write_record_batches(
+ &mut record_batches,
+ column_index_in_insert.as_slice(),
+ table,
+ &default_value_map,
+ )
+ .await?;
+ result_rows += num_rows;
+ }
+ Ok(result_rows)
+ });
+
+ match tokio::try_join!(producer, consumer) {
+ Ok((select_res, write_rows)) => {
+ select_res?;
+ Ok(Output::AffectedRows(write_rows?))
+ }
+ Err(e) => Err(Error::AsyncTask {
+ msg: format!("{}", e),
+ })
+ .context(Insert)?,
+ }
}
- };
+ }
+ }
+}
- maybe_generate_tsid(&mut rows).context(Insert)?;
+async fn write_record_batches(
+ record_batches: &mut Vec<CommonRecordBatch>,
+ column_index_in_insert: &[InsertMode],
+ table: TableRef,
+ default_value_map: &BTreeMap<usize, DfLogicalExpr>,
+) -> InterpreterResult<usize> {
+ let row_group = convert_records_to_row_group(
+ record_batches.as_slice(),
+ column_index_in_insert,
+ table.schema(),
+ )
+ .context(Insert)?;
+ record_batches.clear();
+
+ prepare_and_write_table(table, row_group, default_value_map).await
+}
- // Fill default values
- fill_default_values(table.clone(), &mut rows,
&default_value_map).context(Insert)?;
+async fn prepare_and_write_table(
+ table: TableRef,
+ mut row_group: RowGroup,
+ default_value_map: &BTreeMap<usize, DfLogicalExpr>,
+) -> InterpreterResult<usize> {
+ maybe_generate_tsid(&mut row_group).context(Insert)?;
- let request = WriteRequest { row_group: rows };
+ // Fill default values
+ fill_default_values(table.clone(), &mut row_group,
default_value_map).context(Insert)?;
- let num_rows = table
- .write(request)
- .await
- .context(WriteTable)
- .context(Insert)?;
+ let request = WriteRequest { row_group };
- Ok(Output::AffectedRows(num_rows))
- }
+ let num_rows = table
+ .write(request)
+ .await
+ .context(WriteTable)
+ .context(Insert)?;
+
+ Ok(num_rows)
}
async fn exec_select_logical_plan(
@@ -199,7 +302,7 @@ async fn exec_select_logical_plan(
query_plan: QueryPlan,
executor: ExecutorRef,
physical_planner: PhysicalPlannerRef,
-) -> Result<RecordBatchVec> {
+) -> Result<SendableRecordBatchStream> {
let priority = Priority::High;
let query_ctx = ctx
@@ -216,7 +319,7 @@ async fn exec_select_logical_plan(
})?;
// Execute select physical plan.
- let record_batch_stream = executor
+ let record_batch_stream: SendableRecordBatchStream = executor
.execute(&query_ctx, physical_plan)
.await
.box_err()
@@ -224,26 +327,17 @@ async fn exec_select_logical_plan(
msg: "failed to execute select physical plan",
})?;
- let record_batches =
- record_batch_stream
- .try_collect()
- .await
- .box_err()
- .context(ExecuteSelectPlan {
- msg: "failed to collect select execution results",
- })?;
-
- Ok(record_batches)
+ Ok(record_batch_stream)
}
fn convert_records_to_row_group(
- record_batches: RecordBatchVec,
- column_index_in_insert: Vec<InsertMode>,
+ record_batches: &[CommonRecordBatch],
+ column_index_in_insert: &[InsertMode],
schema: Schema,
) -> Result<RowGroup> {
let mut data_rows: Vec<Row> = Vec::new();
- for record in &record_batches {
+ for record in record_batches {
let num_cols = record.num_columns();
let num_rows = record.num_rows();
for row_idx in 0..num_rows {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]