This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 5f00b3aa [fix] inconsistency of config items while synchronizing data
between … (#1359)
5f00b3aa is described below
commit 5f00b3aafeaaefe7162398238ad68127de117ff5
Author: kkf1 <[email protected]>
AuthorDate: Wed Nov 9 14:14:51 2022 +0800
[fix] inconsistency of config items while synchronizing data between …
(#1359)
* [fix] inconsistency of config items while synchronizing data between two
clusters
* [fix] inconsistency of config items while synchronizing data between two
clusters
---
syncer/service/event/manager.go | 50 +++++++++++++++++++-------
syncer/service/replicator/resource/config.go | 2 +-
syncer/service/replicator/resource/resource.go | 6 +++-
syncer/service/task/manager.go | 16 ++++++---
4 files changed, 55 insertions(+), 19 deletions(-)
diff --git a/syncer/service/event/manager.go b/syncer/service/event/manager.go
index 6c3b04ff..01ff0d23 100644
--- a/syncer/service/event/manager.go
+++ b/syncer/service/event/manager.go
@@ -34,7 +34,11 @@ import (
)
const (
- DefaultInternal = 500 * time.Millisecond
+ DefaultInternal = 500 * time.Millisecond
+ eventChanSize = 1000
+ batchEventChanSize = 100
+ resChanSize = 1000
+ eventSliceSize = 100
)
var m Manager
@@ -95,10 +99,11 @@ func Replicator(r replicator.Replicator) ManagerOption {
func NewManager(os ...ManagerOption) Manager {
mo := toManagerOptions(os...)
em := &ManagerImpl{
- events: make(chan *Event, 1000),
- result: make(chan *Result, 1000),
- internal: mo.internal,
- Replicator: mo.replicator,
+ events: make(chan *Event, eventChanSize),
+ batchEvents: make(chan []*Event, batchEventChanSize),
+ result: make(chan *Result, resChanSize),
+ internal: mo.internal,
+ Replicator: mo.replicator,
}
return em
}
@@ -117,7 +122,8 @@ type Manager interface {
}
type ManagerImpl struct {
- events chan *Event
+ events chan *Event
+ batchEvents chan []*Event
internal time.Duration
ticker *time.Ticker
@@ -238,12 +244,15 @@ func (s syncEvents) Swap(i, j int) {
func (e *ManagerImpl) HandleEvent() {
gopool.Go(func(ctx context.Context) {
- e.handleEvent(ctx)
+ e.handleBatchEvents(ctx)
+ })
+ gopool.Go(func(ctx context.Context) {
+ e.readAndPackEvents(ctx)
})
}
-func (e *ManagerImpl) handleEvent(ctx context.Context) {
- events := make([]*Event, 0, 100)
+func (e *ManagerImpl) readAndPackEvents(ctx context.Context) {
+ events := make([]*Event, 0, eventSliceSize)
e.ticker = time.NewTicker(e.internal)
for {
select {
@@ -253,8 +262,8 @@ func (e *ManagerImpl) handleEvent(ctx context.Context) {
}
send := events[:]
- events = make([]*Event, 0, 100)
- go e.handle(ctx, send)
+ events = make([]*Event, 0, eventSliceSize)
+ e.batchEvents <- send
case event, ok := <-e.events:
if !ok {
return
@@ -263,9 +272,24 @@ func (e *ManagerImpl) handleEvent(ctx context.Context) {
events = append(events, event)
if len(events) > 50 {
send := events[:]
- events = make([]*Event, 0, 100)
- go e.handle(ctx, send)
+ events = make([]*Event, 0, eventSliceSize)
+ e.batchEvents <- send
+ }
+ case <-ctx.Done():
+ e.Close()
+ return
+ }
+ }
+}
+
+func (e *ManagerImpl) handleBatchEvents(ctx context.Context) {
+ for {
+ select {
+ case send, ok := <-e.batchEvents:
+ if !ok {
+ return
}
+ e.handle(ctx, send)
case <-ctx.Done():
e.Close()
return
diff --git a/syncer/service/replicator/resource/config.go
b/syncer/service/replicator/resource/config.go
index 80ac5b43..d431f5a2 100644
--- a/syncer/service/replicator/resource/config.go
+++ b/syncer/service/replicator/resource/config.go
@@ -101,7 +101,7 @@ func (c *kvConfig) NeedOperate(ctx context.Context) *Result
{
curNotNil: c.cur != nil,
event: c.event,
updateTime: func() (int64, error) {
- return c.cur.UpdateTime, nil
+ return secToNanoSec(c.cur.UpdateTime), nil
},
resourceID: kiedb.TombstoneID(c.input),
}
diff --git a/syncer/service/replicator/resource/resource.go
b/syncer/service/replicator/resource/resource.go
index 37d09b9c..fe1a128b 100644
--- a/syncer/service/replicator/resource/resource.go
+++ b/syncer/service/replicator/resource/resource.go
@@ -312,7 +312,11 @@ func formatUpdateTimeSecond(src string) (int64, error) {
return 0, err
}
- return updateTime * 1000 * 1000 * 1000, nil
+ return secToNanoSec(updateTime), nil
+}
+
+func secToNanoSec(timestamp int64) int64 {
+ return timestamp * 1000 * 1000 * 1000
}
func (o *checker) needOperate(ctx context.Context) *Result {
diff --git a/syncer/service/task/manager.go b/syncer/service/task/manager.go
index 1072ca9e..e9cf5fb8 100644
--- a/syncer/service/task/manager.go
+++ b/syncer/service/task/manager.go
@@ -185,19 +185,27 @@ func (m *manager) ListTasks(ctx context.Context)
([]*carisync.Task, error) {
noHandleTasks := make([]*carisync.Task, 0, len(tasks))
skipTaskIDs := make([]string, 0, len(tasks))
+ allTaskIDs := make(map[string]struct{}, len(tasks))
for _, t := range tasks {
+ allTaskIDs[t.ID] = struct{}{}
_, ok := m.cache.Load(t.ID)
if ok {
skipTaskIDs = append(skipTaskIDs, t.ID)
continue
}
m.cache.Store(t.ID, t)
-
noHandleTasks = append(noHandleTasks, t)
}
+ m.cache.Range(func(key, value any) bool {
+ if _, ok := allTaskIDs[key.(string)]; !ok {
+ m.cache.Delete(key)
+ }
+ return true
+ })
- log.Info(fmt.Sprintf("load task raw count %d, to handle count %d, skip
ids %v",
- len(tasks), len(noHandleTasks), skipTaskIDs))
+ log.Info(fmt.Sprintf("load task raw count %d, to handle count %d",
+ len(tasks), len(noHandleTasks)))
+ log.Info(fmt.Sprintf("skip ids %v", skipTaskIDs))
return noHandleTasks, nil
}
@@ -259,7 +267,7 @@ func (m *manager) handleResult(res *event.Result) {
log.Info(fmt.Sprintf("key: %s,result: %v", res.ID, res.Data))
- t, ok := m.cache.LoadAndDelete(res.ID)
+ t, ok := m.cache.Load(res.ID)
if !ok {
return
}