zealchen commented on code in PR #1505:
URL: 
https://github.com/apache/incubator-horaedb/pull/1505#discussion_r1536984333


##########
src/analytic_engine/src/instance/wal_replayer.rs:
##########
@@ -181,19 +181,41 @@ impl Replay for TableBasedReplay {
     ) -> Result<FailedTables> {
         debug!("Replay wal logs on table mode, context:{context}, 
tables:{table_datas:?}",);
 
-        let mut failed_tables = HashMap::new();
-        let read_ctx = ReadContext {
+        let failed_tables = Arc::new(StdMutex::new(HashMap::new()));
+        let read_ctx = Arc::new(ReadContext {
             batch_size: context.wal_replay_batch_size,
             ..Default::default()
-        };
-        for table_data in table_datas {
-            let table_id = table_data.id;
-            if let Err(e) = Self::recover_table_logs(context, table_data, 
&read_ctx).await {
-                failed_tables.insert(table_id, e);
-            }
-        }
+        });
+
+        futures::stream::iter(
+            table_datas
+                .iter()
+                .map(|table_data| {
+                    let table_id = table_data.id;
+                    let read_ctx = read_ctx.clone();
+                    async move {
+                        let ret = Self::recover_table_logs(context, 
table_data, &read_ctx).await;
+                        (table_id, ret)
+                    }
+                })
+                .collect::<Vec<_>>(),
+        )
+        .for_each_concurrent(
+            None, // Is it ok to unlimit the concurrency?
+            |result| async {
+                let (table_id, ret) = result.await;
+                if let Err(e) = ret {
+                    // If occur error, mark this table as failed and store the 
cause.
+                    failed_tables.lock().unwrap().insert(table_id, e);
+                }
+            },
+        )
+        .await;
 
-        Ok(failed_tables)
+        Ok(Arc::try_unwrap(failed_tables)

Review Comment:
   Actually, in my previous commit 
[here](https://github.com/apache/incubator-horaedb/pull/1505/commits/8c3bd80747d63fc3766bafc2c961d72a53301898),
 I did like that. I feel like the current commit is more rusty. If you feel it 
is not the best practice I will roll back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to