This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 44caa997 Feat concurrent replay wal (#1505)
44caa997 is described below
commit 44caa997e37991e570741c3da6c961563569da42
Author: MianChen <[email protected]>
AuthorDate: Mon Mar 25 19:52:05 2024 +0800
Feat concurrent replay wal (#1505)
## Rationale
Close https://github.com/apache/incubator-horaedb/issues/1498
## Detailed Changes
Recover table logs in parallel.
## Test Plan
CI
---------
Co-authored-by: chenmian.cm <[email protected]>
---
src/analytic_engine/src/instance/wal_replayer.rs | 21 ++++++++++++++++++---
1 file changed, 18 insertions(+), 3 deletions(-)
diff --git a/src/analytic_engine/src/instance/wal_replayer.rs
b/src/analytic_engine/src/instance/wal_replayer.rs
index 78792541..6a996fe9 100644
--- a/src/analytic_engine/src/instance/wal_replayer.rs
+++ b/src/analytic_engine/src/instance/wal_replayer.rs
@@ -186,9 +186,24 @@ impl Replay for TableBasedReplay {
batch_size: context.wal_replay_batch_size,
..Default::default()
};
- for table_data in table_datas {
- let table_id = table_data.id;
- if let Err(e) = Self::recover_table_logs(context, table_data,
&read_ctx).await {
+
+ let mut tasks = futures::stream::iter(
+ table_datas
+ .iter()
+ .map(|table_data| {
+ let table_id = table_data.id;
+ let read_ctx = &read_ctx;
+ async move {
+ let ret = Self::recover_table_logs(context,
table_data, read_ctx).await;
+ (table_id, ret)
+ }
+ })
+ .collect::<Vec<_>>(),
+ )
+ .buffer_unordered(20);
+ while let Some((table_id, ret)) = tasks.next().await {
+ if let Err(e) = ret {
+ // If occur error, mark this table as failed and store the
cause.
failed_tables.insert(table_id, e);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]