This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new a119bab [DefaultRouter] fix unnecessary system clock reads due to
races accessing router state (#694)
a119bab is described below
commit a119bab0f8598601c0eb7f0fcd97da7ab06700c7
Author: dferstay <[email protected]>
AuthorDate: Mon Jan 17 00:05:25 2022 -0800
[DefaultRouter] fix unnecessary system clock reads due to races accessing
router state (#694)
Previously, we used atomic operations to read and update parts of the
default router state. Unfortunately, the reads and updates could race
under concurrent calls leading to unnecessary clock reads and an
associated slowdown in performance.
Now, we use atomic addition to increment the message count and batch size.
This removes the race condition by ensuring that each go-routine will have
a unique messageCount, and hence only one will perform the clock read.
Furthermore, we use atomic compare-and-swap to ensure that partitions are
not skipped if multiple go-routines attempt to increment the partition
cursor.
Signed-off-by: Daniel Ferstay <[email protected]>
Co-authored-by: Daniel Ferstay <[email protected]>
---
pulsar/default_router.go | 50 +++++++++++++++++++++----------------------
pulsar/default_router_test.go | 13 +++++++----
2 files changed, 34 insertions(+), 29 deletions(-)
diff --git a/pulsar/default_router.go b/pulsar/default_router.go
index b5e24a6..6945ff1 100644
--- a/pulsar/default_router.go
+++ b/pulsar/default_router.go
@@ -18,7 +18,6 @@
package pulsar
import (
- "math"
"math/rand"
"sync/atomic"
"time"
@@ -27,7 +26,7 @@ import (
type defaultRouter struct {
currentPartitionCursor uint32
- lastChangeTimestamp int64
+ lastBatchTimestamp int64
msgCounter uint32
cumulativeBatchSize uint32
}
@@ -45,7 +44,7 @@ func NewDefaultRouter(
disableBatching bool) func(*ProducerMessage, uint32) int {
state := &defaultRouter{
currentPartitionCursor: rand.Uint32(),
- lastChangeTimestamp: math.MinInt64,
+ lastBatchTimestamp: time.Now().UnixNano(),
}
readClockAfterNumMessages := uint32(maxBatchingMessages / 10)
@@ -75,37 +74,38 @@ func NewDefaultRouter(
// If there's no key, we do round-robin across partition,
sticking with a given
// partition for a certain amount of messages or volume
buffered or the max delay to batch is reached so that
// we ensure having a decent amount of batching of the messages.
- // Note that it is possible that we skip more than one
partition if multiple goroutines increment
- // currentPartitionCursor at the same time. If that happens it
shouldn't be a problem because we only want to
- // spread the data on different partitions but not necessarily
in a specific sequence.
var now int64
size := uint32(len(message.Payload))
- previousMessageCount := atomic.LoadUint32(&state.msgCounter)
- previousBatchingMaxSize :=
atomic.LoadUint32(&state.cumulativeBatchSize)
- previousLastChange :=
atomic.LoadInt64(&state.lastChangeTimestamp)
+ partitionCursor :=
atomic.LoadUint32(&state.currentPartitionCursor)
+ messageCount := atomic.AddUint32(&state.msgCounter, 1)
+ batchSize := atomic.AddUint32(&state.cumulativeBatchSize, size)
- messageCountReached := previousMessageCount >=
uint32(maxBatchingMessages-1)
- sizeReached := (size >=
uint32(maxBatchingSize)-previousBatchingMaxSize)
+ // Note: use greater-than for the threshold check so that we
don't route this message to a new partition
+ // before a batch is complete.
+ messageCountReached := messageCount >
uint32(maxBatchingMessages)
+ sizeReached := batchSize > uint32(maxBatchingSize)
durationReached := false
- if readClockAfterNumMessages == 0 ||
previousMessageCount%readClockAfterNumMessages == 0 {
+ if readClockAfterNumMessages == 0 ||
messageCount%readClockAfterNumMessages == 0 {
now = time.Now().UnixNano()
- durationReached = now-previousLastChange >=
maxBatchingDelay.Nanoseconds()
+ lastBatchTime :=
atomic.LoadInt64(&state.lastBatchTimestamp)
+ durationReached = now-lastBatchTime >
maxBatchingDelay.Nanoseconds()
}
if messageCountReached || sizeReached || durationReached {
- atomic.AddUint32(&state.currentPartitionCursor, 1)
- atomic.StoreUint32(&state.msgCounter, 0)
- atomic.StoreUint32(&state.cumulativeBatchSize, 0)
- if now != 0 {
- atomic.StoreInt64(&state.lastChangeTimestamp,
now)
+ // Note: CAS to ensure that concurrent go-routines can
only move the cursor forward by one so that
+ // partitions are not skipped.
+ newCursor := partitionCursor + 1
+ if
atomic.CompareAndSwapUint32(&state.currentPartitionCursor, partitionCursor,
newCursor) {
+ atomic.StoreUint32(&state.msgCounter, 0)
+ atomic.StoreUint32(&state.cumulativeBatchSize,
0)
+ if now == 0 {
+ now = time.Now().UnixNano()
+ }
+ atomic.StoreInt64(&state.lastBatchTimestamp,
now)
}
- return int(state.currentPartitionCursor % numPartitions)
- }
- atomic.AddUint32(&state.msgCounter, 1)
- atomic.AddUint32(&state.cumulativeBatchSize, size)
- if now != 0 {
- atomic.StoreInt64(&state.lastChangeTimestamp, now)
+ return int(newCursor % numPartitions)
}
- return int(state.currentPartitionCursor % numPartitions)
+
+ return int(partitionCursor % numPartitions)
}
}
diff --git a/pulsar/default_router_test.go b/pulsar/default_router_test.go
index 31b27af..3c42e66 100644
--- a/pulsar/default_router_test.go
+++ b/pulsar/default_router_test.go
@@ -71,16 +71,21 @@ func
TestDefaultRouterRoutingBecauseMaxNumberOfMessagesReached(t *testing.T) {
const numPartitions = uint32(3)
p1 := router(&ProducerMessage{
Payload: []byte("message 1"),
- }, 3)
+ }, numPartitions)
assert.LessOrEqual(t, p1, int(numPartitions))
p2 := router(&ProducerMessage{
Payload: []byte("message 2"),
}, numPartitions)
- if p1 == int(numPartitions-1) {
- assert.Equal(t, 0, p2)
+ assert.Equal(t, p1, p2)
+
+ p3 := router(&ProducerMessage{
+ Payload: []byte("message 3"),
+ }, numPartitions)
+ if p2 == int(numPartitions-1) {
+ assert.Equal(t, 0, p3)
} else {
- assert.Equal(t, p1+1, p2)
+ assert.Equal(t, p2+1, p3)
}
}