This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 44caa997 Feat concurrent replay wal (#1505)
44caa997 is described below

commit 44caa997e37991e570741c3da6c961563569da42
Author: MianChen <[email protected]>
AuthorDate: Mon Mar 25 19:52:05 2024 +0800

    Feat concurrent replay wal (#1505)
    
    ## Rationale
    Close https://github.com/apache/incubator-horaedb/issues/1498
    
    ## Detailed Changes
    Recover table logs in parallel.
    
    ## Test Plan
    CI
    
    ---------
    
    Co-authored-by: chenmian.cm <[email protected]>
---
 src/analytic_engine/src/instance/wal_replayer.rs | 21 ++++++++++++++++++---
 1 file changed, 18 insertions(+), 3 deletions(-)

diff --git a/src/analytic_engine/src/instance/wal_replayer.rs 
b/src/analytic_engine/src/instance/wal_replayer.rs
index 78792541..6a996fe9 100644
--- a/src/analytic_engine/src/instance/wal_replayer.rs
+++ b/src/analytic_engine/src/instance/wal_replayer.rs
@@ -186,9 +186,24 @@ impl Replay for TableBasedReplay {
             batch_size: context.wal_replay_batch_size,
             ..Default::default()
         };
-        for table_data in table_datas {
-            let table_id = table_data.id;
-            if let Err(e) = Self::recover_table_logs(context, table_data, 
&read_ctx).await {
+
+        let mut tasks = futures::stream::iter(
+            table_datas
+                .iter()
+                .map(|table_data| {
+                    let table_id = table_data.id;
+                    let read_ctx = &read_ctx;
+                    async move {
+                        let ret = Self::recover_table_logs(context, 
table_data, read_ctx).await;
+                        (table_id, ret)
+                    }
+                })
+                .collect::<Vec<_>>(),
+        )
+        .buffer_unordered(20);
+        while let Some((table_id, ret)) = tasks.next().await {
+            if let Err(e) = ret {
+                // If occur error, mark this table as failed and store the 
cause.
                 failed_tables.insert(table_id, e);
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to