When choosing the scheduling bucket for the next event group in
rpl_parallel_entry::choose_thread(), use an explicit FIFO for the
round-robin selection instead of a simple cyclic counter i := (i+1) % N.

This allows to schedule XA COMMIT/ROLLBACK dependencies explicitly without
changing the round-robin scheduling of other event groups.

Signed-off-by: Kristian Nielsen <kniel...@knielsen-hq.org>
---
 sql/rpl_parallel.cc | 54 +++++++++++++++++++++++++++------------------
 sql/rpl_parallel.h  | 13 ++++++++---
 2 files changed, 42 insertions(+), 25 deletions(-)

diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 6d6b78054f9..5be700a700f 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -2356,33 +2356,38 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, 
bool *did_enter_cond,
                                   PSI_stage_info *old_stage,
                                   Gtid_log_event *gtid_ev)
 {
-  uint32 idx;
+  sched_bucket *cur_thr;
   Relay_log_info *rli= rgi->rli;
   rpl_parallel_thread *thr;
 
-  idx= rpl_thread_idx;
   if (gtid_ev)
   {
+    /* New event group; cycle the thread scheduling buckets round-robin. */
+    thread_sched_fifo->push_back(thread_sched_fifo->get());
+
     if (gtid_ev->flags2 &
         (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA))
-      idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(),
-                        gtid_ev->xid.key_length()) % rpl_thread_max;
-    else
     {
-      ++idx;
-      if (idx >= rpl_thread_max)
-        idx= 0;
+      /*
+        For XA COMMIT/ROLLBACK, choose the same bucket as the XA PREPARE,
+        overriding the round-robin scheduling.
+      */
+      uint32 idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(),
+                        gtid_ev->xid.key_length()) % rpl_thread_max;
+      rpl_threads[idx].unlink();
+      thread_sched_fifo->append(rpl_threads + idx);
     }
-    rpl_thread_idx= idx;
   }
-  thr= rpl_threads[idx];
+  cur_thr= thread_sched_fifo->head();
+
+  thr= cur_thr->thr;
   if (thr)
   {
     *did_enter_cond= false;
     mysql_mutex_lock(&thr->LOCK_rpl_thread);
     for (;;)
     {
-      if (thr->current_owner != &rpl_threads[idx])
+      if (thr->current_owner != &cur_thr->thr)
       {
         /*
           The worker thread became idle, and returned to the free list and
@@ -2448,8 +2453,8 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, 
bool *did_enter_cond,
     }
   }
   if (!thr)
-    rpl_threads[idx]= thr= global_rpl_thread_pool.get_thread(&rpl_threads[idx],
-                                                             this);
+    cur_thr->thr= thr=
+      global_rpl_thread_pool.get_thread(&cur_thr->thr, this);
 
   return thr;
 }
@@ -2506,15 +2511,20 @@ rpl_parallel::find(uint32 domain_id)
     ulong count= opt_slave_domain_parallel_threads;
     if (count == 0 || count > opt_slave_parallel_threads)
       count= opt_slave_parallel_threads;
-    rpl_parallel_thread **p;
+    rpl_parallel_entry::sched_bucket *p;
+    I_List<rpl_parallel_entry::sched_bucket> *fifo;
     if (!my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME|MY_ZEROFILL),
                          &e, sizeof(*e),
                          &p, count*sizeof(*p),
+                         &fifo, sizeof(*fifo),
                          NULL))
     {
       my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p)));
       return NULL;
     }
+    e->thread_sched_fifo = new (fifo) I_List<rpl_parallel_entry::sched_bucket>;
+    for (ulong i= 0; i < count; ++i)
+      e->thread_sched_fifo->push_back(::new (p+i) 
rpl_parallel_entry::sched_bucket);
     e->rpl_threads= p;
     e->rpl_thread_max= count;
     e->domain_id= domain_id;
@@ -2580,10 +2590,10 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info 
*rli)
     mysql_mutex_unlock(&e->LOCK_parallel_entry);
     for (j= 0; j < e->rpl_thread_max; ++j)
     {
-      if ((rpt= e->rpl_threads[j]))
+      if ((rpt= e->rpl_threads[j].thr))
       {
         mysql_mutex_lock(&rpt->LOCK_rpl_thread);
-        if (rpt->current_owner == &e->rpl_threads[j])
+        if (rpt->current_owner == &e->rpl_threads[j].thr)
           mysql_cond_signal(&rpt->COND_rpl_thread);
         mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
       }
@@ -2603,10 +2613,10 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info 
*rli)
     e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
     for (j= 0; j < e->rpl_thread_max; ++j)
     {
-      if ((rpt= e->rpl_threads[j]))
+      if ((rpt= e->rpl_threads[j].thr))
       {
         mysql_mutex_lock(&rpt->LOCK_rpl_thread);
-        while (rpt->current_owner == &e->rpl_threads[j])
+        while (rpt->current_owner == &e->rpl_threads[j].thr)
           mysql_cond_wait(&rpt->COND_rpl_thread_stop, &rpt->LOCK_rpl_thread);
         mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
       }
@@ -2653,7 +2663,7 @@ int
 rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi,
                                          Format_description_log_event *fdev)
 {
-  uint32 idx;
+  sched_bucket *cur_thr;
   rpl_parallel_thread *thr;
   rpl_parallel_thread::queued_event *qev;
   Relay_log_info *rli= rgi->rli;
@@ -2668,12 +2678,12 @@ rpl_parallel_entry::queue_master_restart(rpl_group_info 
*rgi,
     Thus there is no need for the full complexity of choose_thread(). We only
     need to check if we have a current worker thread, and queue for it if so.
   */
-  idx= rpl_thread_idx;
-  thr= rpl_threads[idx];
+  cur_thr= thread_sched_fifo->head();
+  thr= cur_thr->thr;
   if (!thr)
     return 0;
   mysql_mutex_lock(&thr->LOCK_rpl_thread);
-  if (thr->current_owner != &rpl_threads[idx])
+  if (thr->current_owner != &cur_thr->thr)
   {
     /* No active worker thread, so no need to queue the master restart. */
     mysql_mutex_unlock(&thr->LOCK_rpl_thread);
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index bcce54bc0ec..9ba86453d06 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -326,6 +326,11 @@ struct rpl_parallel_thread_pool {
 
 
 struct rpl_parallel_entry {
+  struct sched_bucket : public ilink {
+    sched_bucket() : thr(nullptr) { }
+    rpl_parallel_thread *thr;
+  };
+
   mysql_mutex_t LOCK_parallel_entry;
   mysql_cond_t COND_parallel_entry;
   uint32 domain_id;
@@ -355,17 +360,19 @@ struct rpl_parallel_entry {
   uint64 stop_sub_id;
 
   /*
-    Cyclic array recording the last rpl_thread_max worker threads that we
+    Array recording the last rpl_thread_max worker threads that we
     queued event for. This is used to limit how many workers a single domain
     can occupy (--slave-domain-parallel-threads).
 
+    The array is structured as a FIFO using an I_List thread_sched_fifo.
+
     Note that workers are never explicitly deleted from the array. Instead,
     we need to check (under LOCK_rpl_thread) that the thread still belongs
     to us before re-using (rpl_thread::current_owner).
   */
-  rpl_parallel_thread **rpl_threads;
+  sched_bucket *rpl_threads;
+  I_List<sched_bucket> *thread_sched_fifo;
   uint32 rpl_thread_max;
-  uint32 rpl_thread_idx;
   /*
     The sub_id of the last transaction to commit within this domain_id.
     Must be accessed under LOCK_parallel_entry protection.
-- 
2.30.2

_______________________________________________
developers mailing list -- developers@lists.mariadb.org
To unsubscribe send an email to developers-le...@lists.mariadb.org

Reply via email to