jiacai2050 commented on code in PR #1505:
URL: 
https://github.com/apache/incubator-horaedb/pull/1505#discussion_r1536981615


##########
src/analytic_engine/src/instance/wal_replayer.rs:
##########
@@ -181,19 +181,41 @@ impl Replay for TableBasedReplay {
     ) -> Result<FailedTables> {
         debug!("Replay wal logs on table mode, context:{context}, 
tables:{table_datas:?}",);
 
-        let mut failed_tables = HashMap::new();
-        let read_ctx = ReadContext {
+        let failed_tables = Arc::new(StdMutex::new(HashMap::new()));
+        let read_ctx = Arc::new(ReadContext {
             batch_size: context.wal_replay_batch_size,
             ..Default::default()
-        };
-        for table_data in table_datas {
-            let table_id = table_data.id;
-            if let Err(e) = Self::recover_table_logs(context, table_data, 
&read_ctx).await {
-                failed_tables.insert(table_id, e);
-            }
-        }
+        });
+
+        futures::stream::iter(
+            table_datas
+                .iter()
+                .map(|table_data| {
+                    let table_id = table_data.id;
+                    let read_ctx = read_ctx.clone();
+                    async move {
+                        let ret = Self::recover_table_logs(context, 
table_data, &read_ctx).await;
+                        (table_id, ret)
+                    }
+                })
+                .collect::<Vec<_>>(),
+        )
+        .for_each_concurrent(
+            None, // Is it ok to unlimit the concurrency?
+            |result| async {
+                let (table_id, ret) = result.await;
+                if let Err(e) = ret {
+                    // If occur error, mark this table as failed and store the 
cause.
+                    failed_tables.lock().unwrap().insert(table_id, e);
+                }
+            },
+        )
+        .await;
 
-        Ok(failed_tables)
+        Ok(Arc::try_unwrap(failed_tables)

Review Comment:
   It's not necessary to wrap `failed_tables` in a Mutex, you can refer how we 
do like this before here:
   
   
https://github.com/apache/incubator-horaedb/blob/54a5db725e674934ca490a64118d50df1b1f4cde/src/analytic_engine/src/instance/wal_replayer.rs#L401-L408



##########
src/analytic_engine/src/instance/wal_replayer.rs:
##########
@@ -181,19 +181,41 @@ impl Replay for TableBasedReplay {
     ) -> Result<FailedTables> {
         debug!("Replay wal logs on table mode, context:{context}, 
tables:{table_datas:?}",);
 
-        let mut failed_tables = HashMap::new();
-        let read_ctx = ReadContext {
+        let failed_tables = Arc::new(StdMutex::new(HashMap::new()));
+        let read_ctx = Arc::new(ReadContext {
             batch_size: context.wal_replay_batch_size,
             ..Default::default()
-        };
-        for table_data in table_datas {
-            let table_id = table_data.id;
-            if let Err(e) = Self::recover_table_logs(context, table_data, 
&read_ctx).await {
-                failed_tables.insert(table_id, e);
-            }
-        }
+        });
+
+        futures::stream::iter(
+            table_datas
+                .iter()
+                .map(|table_data| {
+                    let table_id = table_data.id;
+                    let read_ctx = read_ctx.clone();
+                    async move {
+                        let ret = Self::recover_table_logs(context, 
table_data, &read_ctx).await;
+                        (table_id, ret)
+                    }
+                })
+                .collect::<Vec<_>>(),
+        )
+        .for_each_concurrent(
+            None, // Is it ok to unlimit the concurrency?
+            |result| async {
+                let (table_id, ret) = result.await;
+                if let Err(e) = ret {
+                    // If occur error, mark this table as failed and store the 
cause.
+                    failed_tables.lock().unwrap().insert(table_id, e);
+                }
+            },
+        )
+        .await;
 
-        Ok(failed_tables)
+        Ok(Arc::try_unwrap(failed_tables)

Review Comment:
   It's not necessary to wrap `failed_tables` inside a Mutex, you can refer how 
we do like this before here:
   
   
https://github.com/apache/incubator-horaedb/blob/54a5db725e674934ca490a64118d50df1b1f4cde/src/analytic_engine/src/instance/wal_replayer.rs#L401-L408



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to