This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f00b3aa [fix] inconsistency of config items while synchronizing data 
between … (#1359)
5f00b3aa is described below

commit 5f00b3aafeaaefe7162398238ad68127de117ff5
Author: kkf1 <[email protected]>
AuthorDate: Wed Nov 9 14:14:51 2022 +0800

    [fix] inconsistency of config items while synchronizing data between … 
(#1359)
    
    * [fix] inconsistency of config items while synchronizing data between two 
clusters
    
    * [fix] inconsistency of config items while synchronizing data between two 
clusters
---
 syncer/service/event/manager.go                | 50 +++++++++++++++++++-------
 syncer/service/replicator/resource/config.go   |  2 +-
 syncer/service/replicator/resource/resource.go |  6 +++-
 syncer/service/task/manager.go                 | 16 ++++++---
 4 files changed, 55 insertions(+), 19 deletions(-)

diff --git a/syncer/service/event/manager.go b/syncer/service/event/manager.go
index 6c3b04ff..01ff0d23 100644
--- a/syncer/service/event/manager.go
+++ b/syncer/service/event/manager.go
@@ -34,7 +34,11 @@ import (
 )
 
 const (
-       DefaultInternal = 500 * time.Millisecond
+       DefaultInternal    = 500 * time.Millisecond
+       eventChanSize      = 1000
+       batchEventChanSize = 100
+       resChanSize        = 1000
+       eventSliceSize     = 100
 )
 
 var m Manager
@@ -95,10 +99,11 @@ func Replicator(r replicator.Replicator) ManagerOption {
 func NewManager(os ...ManagerOption) Manager {
        mo := toManagerOptions(os...)
        em := &ManagerImpl{
-               events:     make(chan *Event, 1000),
-               result:     make(chan *Result, 1000),
-               internal:   mo.internal,
-               Replicator: mo.replicator,
+               events:      make(chan *Event, eventChanSize),
+               batchEvents: make(chan []*Event, batchEventChanSize),
+               result:      make(chan *Result, resChanSize),
+               internal:    mo.internal,
+               Replicator:  mo.replicator,
        }
        return em
 }
@@ -117,7 +122,8 @@ type Manager interface {
 }
 
 type ManagerImpl struct {
-       events chan *Event
+       events      chan *Event
+       batchEvents chan []*Event
 
        internal time.Duration
        ticker   *time.Ticker
@@ -238,12 +244,15 @@ func (s syncEvents) Swap(i, j int) {
 
 func (e *ManagerImpl) HandleEvent() {
        gopool.Go(func(ctx context.Context) {
-               e.handleEvent(ctx)
+               e.handleBatchEvents(ctx)
+       })
+       gopool.Go(func(ctx context.Context) {
+               e.readAndPackEvents(ctx)
        })
 }
 
-func (e *ManagerImpl) handleEvent(ctx context.Context) {
-       events := make([]*Event, 0, 100)
+func (e *ManagerImpl) readAndPackEvents(ctx context.Context) {
+       events := make([]*Event, 0, eventSliceSize)
        e.ticker = time.NewTicker(e.internal)
        for {
                select {
@@ -253,8 +262,8 @@ func (e *ManagerImpl) handleEvent(ctx context.Context) {
                        }
                        send := events[:]
 
-                       events = make([]*Event, 0, 100)
-                       go e.handle(ctx, send)
+                       events = make([]*Event, 0, eventSliceSize)
+                       e.batchEvents <- send
                case event, ok := <-e.events:
                        if !ok {
                                return
@@ -263,9 +272,24 @@ func (e *ManagerImpl) handleEvent(ctx context.Context) {
                        events = append(events, event)
                        if len(events) > 50 {
                                send := events[:]
-                               events = make([]*Event, 0, 100)
-                               go e.handle(ctx, send)
+                               events = make([]*Event, 0, eventSliceSize)
+                               e.batchEvents <- send
+                       }
+               case <-ctx.Done():
+                       e.Close()
+                       return
+               }
+       }
+}
+
+func (e *ManagerImpl) handleBatchEvents(ctx context.Context) {
+       for {
+               select {
+               case send, ok := <-e.batchEvents:
+                       if !ok {
+                               return
                        }
+                       e.handle(ctx, send)
                case <-ctx.Done():
                        e.Close()
                        return
diff --git a/syncer/service/replicator/resource/config.go 
b/syncer/service/replicator/resource/config.go
index 80ac5b43..d431f5a2 100644
--- a/syncer/service/replicator/resource/config.go
+++ b/syncer/service/replicator/resource/config.go
@@ -101,7 +101,7 @@ func (c *kvConfig) NeedOperate(ctx context.Context) *Result 
{
                curNotNil: c.cur != nil,
                event:     c.event,
                updateTime: func() (int64, error) {
-                       return c.cur.UpdateTime, nil
+                       return secToNanoSec(c.cur.UpdateTime), nil
                },
                resourceID: kiedb.TombstoneID(c.input),
        }
diff --git a/syncer/service/replicator/resource/resource.go 
b/syncer/service/replicator/resource/resource.go
index 37d09b9c..fe1a128b 100644
--- a/syncer/service/replicator/resource/resource.go
+++ b/syncer/service/replicator/resource/resource.go
@@ -312,7 +312,11 @@ func formatUpdateTimeSecond(src string) (int64, error) {
                return 0, err
        }
 
-       return updateTime * 1000 * 1000 * 1000, nil
+       return secToNanoSec(updateTime), nil
+}
+
+func secToNanoSec(timestamp int64) int64 {
+       return timestamp * 1000 * 1000 * 1000
 }
 
 func (o *checker) needOperate(ctx context.Context) *Result {
diff --git a/syncer/service/task/manager.go b/syncer/service/task/manager.go
index 1072ca9e..e9cf5fb8 100644
--- a/syncer/service/task/manager.go
+++ b/syncer/service/task/manager.go
@@ -185,19 +185,27 @@ func (m *manager) ListTasks(ctx context.Context) 
([]*carisync.Task, error) {
 
        noHandleTasks := make([]*carisync.Task, 0, len(tasks))
        skipTaskIDs := make([]string, 0, len(tasks))
+       allTaskIDs := make(map[string]struct{}, len(tasks))
        for _, t := range tasks {
+               allTaskIDs[t.ID] = struct{}{}
                _, ok := m.cache.Load(t.ID)
                if ok {
                        skipTaskIDs = append(skipTaskIDs, t.ID)
                        continue
                }
                m.cache.Store(t.ID, t)
-
                noHandleTasks = append(noHandleTasks, t)
        }
+       m.cache.Range(func(key, value any) bool {
+               if _, ok := allTaskIDs[key.(string)]; !ok {
+                       m.cache.Delete(key)
+               }
+               return true
+       })
 
-       log.Info(fmt.Sprintf("load task raw count %d, to handle count %d, skip 
ids %v",
-               len(tasks), len(noHandleTasks), skipTaskIDs))
+       log.Info(fmt.Sprintf("load task raw count %d, to handle count %d",
+               len(tasks), len(noHandleTasks)))
+       log.Info(fmt.Sprintf("skip ids %v", skipTaskIDs))
 
        return noHandleTasks, nil
 }
@@ -259,7 +267,7 @@ func (m *manager) handleResult(res *event.Result) {
 
        log.Info(fmt.Sprintf("key: %s,result: %v", res.ID, res.Data))
 
-       t, ok := m.cache.LoadAndDelete(res.ID)
+       t, ok := m.cache.Load(res.ID)
        if !ok {
                return
        }

Reply via email to