This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 1a860c8  ring buffer (#40)
1a860c8 is described below

commit 1a860c8f1e6f60da7f1444242d3e039bc432170d
Author: askuy <[email protected]>
AuthorDate: Tue Mar 12 20:50:43 2019 +0800

    ring buffer (#40)
---
 utils/ring_buffer.go      | 187 +++++++++++++++++++++++++++++++++++++++-------
 utils/ring_buffer_test.go | 126 +++++++++++++++++++++++++++++++
 2 files changed, 288 insertions(+), 25 deletions(-)

diff --git a/utils/ring_buffer.go b/utils/ring_buffer.go
index 66a1842..9400f53 100644
--- a/utils/ring_buffer.go
+++ b/utils/ring_buffer.go
@@ -17,48 +17,185 @@
 
 package utils
 
-import "sync"
-
-type RingBuffer struct {
-       buf      []byte
-       writePos int
-       readPos  int
-       cap      int
-       rwMutex  sync.RWMutex
-       exitCh   chan interface{}
+import (
+       "runtime"
+       "time"
+       "sync/atomic"
+)
+
+// 1.需要能够动态扩容
+// 2.缩容看情况
+// 3.read的时候需要block
+// 4.线程安全
+type RingNodesBuffer struct {
+       writePos uint64
+       readPos  uint64
+       mask  uint64
+
+       nodes nodes
+}
+
+type node struct {
+       position uint64
+       buf     []byte
 }
 
-func NewRingBuffer(cap int) *RingBuffer {
-       rb := &RingBuffer{buf: make([]byte, cap), cap: cap}
-       go rb.resize()
+type nodes []*node
+
+// roundUp takes a uint64 greater than 0 and rounds it up to the next
+// power of 2.
+func roundUp(v uint64) uint64 {
+       v--
+       v |= v >> 1
+       v |= v >> 2
+       v |= v >> 4
+       v |= v >> 8
+       v |= v >> 16
+       v |= v >> 32
+       v++
+       return v
+}
+
+func (rb *RingNodesBuffer) init(size uint64) {
+       size = roundUp(size)
+       rb.nodes = make(nodes, size)
+       for i := uint64(0); i < size; i++ {
+               rb.nodes[i] = &node{position: i}
+       }
+       rb.mask = size - 1 // so we don't have to do this with every put/get 
operation
+}
+
+func NewRingNodesBuffer(cap uint64) *RingNodesBuffer {
+       rb := &RingNodesBuffer{}
+       rb.init(cap)
+       //go rb.resize()
        return rb
 }
 
-func (r *RingBuffer) Write(b []byte) error {
-       // TODO
+func (r *RingNodesBuffer) Write(b []byte) error {
+       var n *node
+       var dif uint64
+       pos := atomic.LoadUint64(&r.writePos)
+       i := 0
+L:
+       for {
+               // pos 16 seq 1.     0001 0000    00001111   0001 1111
+               n = r.nodes[pos&r.mask]
+               seq := atomic.LoadUint64(&n.position)
+               switch dif = seq - pos; {
+               case dif == 0:
+                       if atomic.CompareAndSwapUint64(&r.writePos, pos, pos+1) 
{
+                               break L
+                       }
+               default:
+                       pos = atomic.LoadUint64(&r.writePos)
+               }
+               if i == 10000 {
+                       runtime.Gosched() // free up the cpu before the next 
iteration
+                       i = 0
+               } else {
+                       i++
+               }
+       }
+
+       n.buf = b
+       atomic.StoreUint64(&n.position, pos+1)
        return nil
 }
 
-func (r *RingBuffer) Read(p []byte) (n int, err error) {
-
-       if r.Size() >= len(p) {
-               copy(p, r.buf[r.readPos:r.readPos+len(p)])
-               r.readPos += len(p)
+// 直接返回数据
+func (r *RingNodesBuffer) Read(timeout time.Duration) (data []byte, err error) 
{
+       var (
+               node     *node
+               pos   = atomic.LoadUint64(&r.readPos)
+               start time.Time
+               dif uint64
+       )
+       if timeout > 0 {
+               start = time.Now()
+       }
+       i := 0
+L:
+       for {
+               node = r.nodes[pos&r.mask]
+               seq := atomic.LoadUint64(&node.position)
+               switch dif = seq - (pos + 1); {
+               case dif == 0:
+                       if atomic.CompareAndSwapUint64(&r.readPos, pos, pos+1) {
+                               break L
+                       }
+               default:
+                       pos = atomic.LoadUint64(&r.readPos)
+               }
+               if timeout > 0 && time.Since(start) >= timeout {
+                       return
+               }
+               if i == 10000 {
+                       runtime.Gosched() // free up the cpu before the next 
iteration
+                       i = 0
+               } else {
+                       i++
+               }
+       }
+       data = node.buf
+       atomic.StoreUint64(&node.position, pos+r.mask+1)
+       return
+}
 
+// 知道大小,传进去解析
+func (r *RingNodesBuffer) ReadBySize(data []byte,timeout time.Duration) (n 
int, err error) {
+       var (
+               node     *node
+               pos   = atomic.LoadUint64(&r.readPos)
+               start time.Time
+               dif uint64
+       )
+       i := 0
+       if timeout > 0 {
+               start = time.Now()
+       }
+L:
+       for {
+               node = r.nodes[pos&r.mask]
+               seq := atomic.LoadUint64(&node.position)
+               switch dif = seq - (pos + 1); {
+               case dif == 0:
+                       if atomic.CompareAndSwapUint64(&r.readPos, pos, pos+1) {
+                               break L
+                       }
+               default:
+                       pos = atomic.LoadUint64(&r.readPos)
+               }
+               if timeout > 0 && time.Since(start) >= timeout {
+                       return
+               }
+               if i == 10000 {
+                       runtime.Gosched() // free up the cpu before the next 
iteration
+                       i = 0
+               } else {
+                       i++
+               }
        }
+       n = copy(data,node.buf)
+       atomic.StoreUint64(&node.position, pos+r.mask+1)
+       return
+}
+
+
+func (r *RingNodesBuffer) Size() uint64 {
+       return atomic.LoadUint64(&r.writePos) - atomic.LoadUint64(&r.readPos)
 
-       // TODO waiting data...
-       return 0, err
 }
 
-func (r *RingBuffer) Size() int {
-       return r.writePos - r.readPos
+// Cap returns the capacity of this ring buffer.
+func (rb *RingNodesBuffer) Cap() uint64 {
+       return uint64(len(rb.nodes))
 }
 
-func (r *RingBuffer) Destroy() {
+func (r *RingNodesBuffer) Destroy() {
 
 }
 
-func (r *RingBuffer) resize() {
+func (r *RingNodesBuffer) resize() {
        // TODO
 }
diff --git a/utils/ring_buffer_test.go b/utils/ring_buffer_test.go
new file mode 100644
index 0000000..a119f21
--- /dev/null
+++ b/utils/ring_buffer_test.go
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package utils
+
+import (
+       "time"
+       "testing"
+       "github.com/stretchr/testify/assert"
+       "sync"
+       "strconv"
+       "fmt"
+)
+
+func TestRingRead(t *testing.T)  {
+       rb := NewRingNodesBuffer(5)
+       assert.Equal(t, uint64(8), rb.Cap())
+
+       err := rb.Write([]byte("hello"))
+       if !assert.Nil(t, err) {
+               return
+       }
+       data, err := rb.Read(1*time.Second)
+       if !assert.Nil(t, err) {
+               return
+       }
+
+       assert.Equal(t, "hello", string(data))
+}
+
+
+func TestRingReadBySize(t *testing.T)  {
+       rb := NewRingNodesBuffer(5)
+       assert.Equal(t, uint64(8), rb.Cap())
+
+       err := rb.Write([]byte("hello"))
+       if !assert.Nil(t, err) {
+               return
+       }
+       sink := make([]byte, 5)
+       n, err := rb.ReadBySize(sink,1*time.Second)
+       if !assert.Nil(t, err) {
+               return
+       }
+
+       assert.Equal(t, 5, n)
+       assert.Equal(t, "hello", string(sink))
+}
+
+func BenchmarkRingReadBufferMPMC(b *testing.B) {
+       q := NewRingNodesBuffer(uint64(b.N * 100))
+       var wg sync.WaitGroup
+       wg.Add(100)
+       b.ResetTimer()
+       b.ReportAllocs()
+
+
+       for i := 0; i < 100; i++ {
+               go func() {
+                       for i := 0; i < b.N; i++ {
+                               q.Write([]byte(strconv.Itoa(i)))
+                       }
+               }()
+       }
+
+       for i := 0; i < 100; i++ {
+               go func() {
+                       for i := 0; i < b.N; i++ {
+                               _ = len(strconv.Itoa(i))
+                               var p []byte
+                               p,_ = q.Read(1*time.Second)
+                               fmt.Sprintf("%v",p)
+
+                       }
+                       wg.Done()
+               }()
+       }
+
+       wg.Wait()
+}
+
+
+
+func BenchmarkRingBySizeBufferMPMC(b *testing.B) {
+       q := NewRingNodesBuffer(uint64(b.N * 100))
+       var wg sync.WaitGroup
+       wg.Add(100)
+       b.ResetTimer()
+       b.ReportAllocs()
+
+
+       for i := 0; i < 100; i++ {
+               go func() {
+                       for i := 0; i < b.N; i++ {
+                               q.Write([]byte(strconv.Itoa(i)))
+                       }
+               }()
+       }
+
+       for i := 0; i < 100; i++ {
+               go func() {
+                       for i := 0; i < b.N; i++ {
+                               p := make([]byte,len(strconv.Itoa(i)))
+                               q.ReadBySize(p,1*time.Second)
+                               fmt.Sprintf("%v",p)
+                       }
+                       wg.Done()
+               }()
+       }
+
+       wg.Wait()
+}
\ No newline at end of file

Reply via email to