Lethannn commented on code in PR #1492:
URL:
https://github.com/apache/incubator-horaedb/pull/1492#discussion_r1518559204
##########
src/analytic_engine/src/instance/wal_replayer.rs:
##########
@@ -374,31 +376,54 @@ impl RegionBasedReplay {
// TODO: No `group_by` method in `VecDeque`, so implement it manually
here...
Self::split_log_batch_by_table(log_batch, &mut table_batches);
- // TODO: Replay logs of different tables in parallel.
- for table_batch in table_batches {
- // Some tables may have failed in previous replay, ignore them.
- if failed_tables.contains_key(&table_batch.table_id) {
- continue;
- }
-
- // Replay all log entries of current table.
- // Some tables may have been moved to other shards or dropped,
ignore such logs.
- if let Some(ctx) = serial_exec_ctxs.get_mut(&table_batch.table_id)
{
- let result = replay_table_log_entries(
- &context.flusher,
- context.max_retry_flush_limit,
- &mut ctx.serial_exec,
- &ctx.table_data,
- log_batch.range(table_batch.range),
- )
- .await;
-
- // If occur error, mark this table as failed and store the
cause.
- if let Err(e) = result {
- failed_tables.insert(table_batch.table_id, e);
+ let alter_failed_tables = HashMap::new();
+ let alter_failed_tables_ref =
Arc::new(Mutex::new(alter_failed_tables));
+
+ let mut serial_exec_ctxs_dash_map = DashMap::new();
Review Comment:
async fn replay_single_batch(
context: &ReplayContext,
log_batch: VecDeque<LogEntry<ReadPayload>>,
serial_exec_ctxs: Arc<tokio::sync::Mutex<HashMap<TableId,
SerialExecContext<'_>>>>,
failed_tables: &mut FailedTables,
) -> Result<()> {
let mut table_batches = Vec::new();
// TODO: No `group_by` method in `VecDeque`, so implement it
manually here...
Self::split_log_batch_by_table(log_batch, &mut table_batches);
// TODO: Replay logs of different tables in parallel.
let mut replay_tasks = Vec::with_capacity(table_batches.len());
for table_batch in table_batches {
// Some tables may have failed in previous replay, ignore them.
if failed_tables.contains_key(&table_batch.table_id) {
continue;
}
let serial_exec_ctxs = serial_exec_ctxs.clone();
replay_tasks.push(async move {
if let Some(ctx) =
serial_exec_ctxs.lock().await.get_mut(&table_batch.table_id) {
let result = replay_table_log_entries(
&context.flusher,
context.max_retry_flush_limit,
&mut ctx.serial_exec,
&ctx.table_data,
log_batch.range(table_batch.range),
)
.await;
(table_batch.table_id, result)
} else {
(table_batch.table_id, Ok(()))
}
});
}
for (table_id, ret) in futures::stream::iter(replay_tasks)
.buffer_unordered(20)
.collect::<Vec<_>>()
.await
{
// If occur error, mark this table as failed and store the cause.
if let Err(e) = ret {
failed_tables.insert(table_id, e);
}
}
Ok(())
}
I ran into the same compile failed before. Here is my code. Is this what you
were expecting? However, My concern is, wouldn't
serial_exec_ctxs.lock().await.get_mut break concurrency?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]