This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch nzx
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/nzx by this push:
     new 31c14443 用于双集群引擎同步debug-v4 (#1358)
31c14443 is described below

commit 31c144435da130eb074f3ba1ea2573257c0593be
Author: kkf1 <[email protected]>
AuthorDate: Tue Nov 8 23:18:01 2022 +0800

    用于双集群引擎同步debug-v4 (#1358)
    
    * 用于双集群引擎同步debug-v3
    
    * 用于双集群引擎同步debug-v4
---
 syncer/service/task/manager.go | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/syncer/service/task/manager.go b/syncer/service/task/manager.go
index 93e9d099..867d8ca4 100644
--- a/syncer/service/task/manager.go
+++ b/syncer/service/task/manager.go
@@ -185,18 +185,25 @@ func (m *manager) ListTasks(ctx context.Context) 
([]*carisync.Task, error) {
 
        noHandleTasks := make([]*carisync.Task, 0, len(tasks))
        skipTaskIDs := make([]string, 0, len(tasks))
+       allTaskIDs := make(map[string]bool)
        for _, t := range tasks {
                log.Info(fmt.Sprintf("list task id: %v", t.ID))
+               allTaskIDs[t.ID] = true
                _, ok := m.cache.Load(t.ID)
                if ok {
                        skipTaskIDs = append(skipTaskIDs, t.ID)
                        continue
                }
                m.cache.Store(t.ID, t)
-
                noHandleTasks = append(noHandleTasks, t)
                log.Info(fmt.Sprintf("no handle task id: %v", t.ID))
        }
+       m.cache.Range(func(key, value any) bool {
+               if _, ok := allTaskIDs[key.(string)]; !ok {
+                       m.cache.Delete(key)
+               }
+               return true
+       })
 
        log.Info(fmt.Sprintf("load task raw count %d, to handle count %d, skip 
ids %v",
                len(tasks), len(noHandleTasks), skipTaskIDs))
@@ -274,8 +281,6 @@ func (m *manager) handleResult(res *event.Result) {
                        log.Error("delete task failed", err)
                }
        }
-
-       m.cache.Delete(res.ID)
 }
 
 func (m *manager) handleTasks(sts syncTasks) {

Reply via email to