Repository: incubator-htrace
Updated Branches:
  refs/heads/master c715e12eb -> fe19368a3


HTRACE-301. htraced: fix unit tests that aren't waiting for spans to be 
written, use semaphore for WrittenSpans (Colin Patrick McCabe via iwasakims)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/fe19368a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/fe19368a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/fe19368a

Branch: refs/heads/master
Commit: fe19368a3cb46595fc511a28ef5d0b7df26abb21
Parents: c715e12
Author: Masatake Iwasaki <[email protected]>
Authored: Fri Nov 20 11:20:06 2015 +0900
Committer: Masatake Iwasaki <[email protected]>
Committed: Fri Nov 20 11:20:06 2015 +0900

----------------------------------------------------------------------
 .../src/org/apache/htrace/common/semaphore.go   | 78 ++++++++++++++++++
 .../org/apache/htrace/common/semaphore_test.go  | 86 ++++++++++++++++++++
 .../org/apache/htrace/htraced/client_test.go    | 25 ++++--
 .../src/org/apache/htrace/htraced/datastore.go  | 32 ++++----
 .../org/apache/htrace/htraced/datastore_test.go | 71 ++++++++--------
 .../org/apache/htrace/htraced/mini_htraced.go   |  4 +-
 .../org/apache/htrace/htraced/reaper_test.go    |  8 +-
 7 files changed, 238 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fe19368a/htrace-htraced/go/src/org/apache/htrace/common/semaphore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/semaphore.go 
b/htrace-htraced/go/src/org/apache/htrace/common/semaphore.go
new file mode 100644
index 0000000..1d56ae9
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/common/semaphore.go
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package common
+
+import (
+       "sync"
+)
+
+// A simple lock-and-condition-variable based semaphore implementation.
+type Semaphore struct {
+       lock sync.Mutex
+       cond *sync.Cond
+       count int64
+}
+
+func NewSemaphore(count int64) *Semaphore {
+       sem := &Semaphore {
+               count:int64(count),
+       }
+       sem.cond = &sync.Cond {
+               L: &sem.lock,
+       }
+       return sem
+}
+
+func (sem *Semaphore) Post() {
+       sem.lock.Lock()
+       sem.count++
+       if sem.count > 0 {
+               sem.cond.Broadcast()
+       }
+       sem.lock.Unlock()
+}
+
+func (sem *Semaphore) Posts(amt int64) {
+       sem.lock.Lock()
+       sem.count+=amt
+       if sem.count > 0 {
+               sem.cond.Broadcast()
+       }
+       sem.lock.Unlock()
+}
+
+func (sem *Semaphore) Wait() {
+       sem.lock.Lock()
+       for {
+               if sem.count > 0 {
+                       sem.count--
+                       sem.lock.Unlock()
+                       return
+               }
+               sem.cond.Wait()
+       }
+}
+
+func (sem *Semaphore) Waits(amt int64) {
+       var i int64
+       for i=0; i<amt; i++ {
+               sem.Wait()
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fe19368a/htrace-htraced/go/src/org/apache/htrace/common/semaphore_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/semaphore_test.go 
b/htrace-htraced/go/src/org/apache/htrace/common/semaphore_test.go
new file mode 100644
index 0000000..089c51b
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/common/semaphore_test.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package common
+
+import (
+       "sync/atomic"
+       "testing"
+       "time"
+)
+
+func TestSemaphoreWake(t *testing.T) {
+       var done uint32
+       sem := NewSemaphore(0)
+       go func() {
+               time.Sleep(10 * time.Nanosecond)
+               atomic.AddUint32(&done, 1)
+               sem.Post()
+       }()
+       sem.Wait()
+       doneVal := atomic.LoadUint32(&done)
+       if doneVal != 1 {
+               t.Fatalf("sem.Wait did not wait for sem.Post")
+       }
+}
+
+func TestSemaphoreCount(t *testing.T) {
+       sem := NewSemaphore(1)
+       sem.Post()
+       sem.Wait()
+       sem.Wait()
+
+       sem = NewSemaphore(-1)
+       sem.Post()
+       sem.Post()
+       sem.Wait()
+}
+
+func TestSemaphoreMultipleGoroutines(t *testing.T) {
+       var done uint32
+       sem := NewSemaphore(0)
+       sem2 := NewSemaphore(0)
+       go func() {
+               sem.Wait()
+               atomic.AddUint32(&done, 1)
+               sem2.Post()
+       }()
+       go func() {
+               time.Sleep(10 * time.Nanosecond)
+               atomic.AddUint32(&done, 1)
+               sem.Post()
+       }()
+       go func() {
+               time.Sleep(20 * time.Nanosecond)
+               atomic.AddUint32(&done, 1)
+               sem.Post()
+       }()
+       sem.Wait()
+       go func() {
+               time.Sleep(10 * time.Nanosecond)
+               atomic.AddUint32(&done, 1)
+               sem.Post()
+       }()
+       sem.Wait()
+       sem2.Wait()
+       doneVal := atomic.LoadUint32(&done)
+       if doneVal != 4 {
+               t.Fatalf("sem.Wait did not wait for sem.Posts")
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fe19368a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
index e4f2151..fae871c 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
@@ -46,6 +46,7 @@ func TestClientGetServerVersion(t *testing.T) {
        if err != nil {
                t.Fatalf("failed to create client: %s", err.Error())
        }
+       defer hcl.Close()
        _, err = hcl.GetServerVersion()
        if err != nil {
                t.Fatalf("failed to call GetServerVersion: %s", err.Error())
@@ -91,7 +92,9 @@ func createRandomTestSpans(amount int) common.SpanSlice {
 
 func TestClientOperations(t *testing.T) {
        htraceBld := &MiniHTracedBuilder{Name: "TestClientOperations",
-               DataDirs: make([]string, 2)}
+               DataDirs: make([]string, 2),
+               WrittenSpans: common.NewSemaphore(0),
+       }
        ht, err := htraceBld.Build()
        if err != nil {
                t.Fatalf("failed to create datastore: %s", err.Error())
@@ -102,6 +105,7 @@ func TestClientOperations(t *testing.T) {
        if err != nil {
                t.Fatalf("failed to create client: %s", err.Error())
        }
+       defer hcl.Close()
 
        // Create some random trace spans.
        NUM_TEST_SPANS := 30
@@ -115,6 +119,7 @@ func TestClientOperations(t *testing.T) {
                t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2,
                        err.Error())
        }
+       ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS/2))
 
        // Look up the first half of the spans.  They should be found.
        var span *common.Span
@@ -181,7 +186,12 @@ func TestClientOperations(t *testing.T) {
 
 func TestDumpAll(t *testing.T) {
        htraceBld := &MiniHTracedBuilder{Name: "TestDumpAll",
-               DataDirs: make([]string, 2)}
+               DataDirs: make([]string, 2),
+               WrittenSpans: common.NewSemaphore(0),
+               Cnf: map[string]string{
+                       conf.HTRACE_LOG_LEVEL: "INFO",
+               },
+       }
        ht, err := htraceBld.Build()
        if err != nil {
                t.Fatalf("failed to create datastore: %s", err.Error())
@@ -192,6 +202,7 @@ func TestDumpAll(t *testing.T) {
        if err != nil {
                t.Fatalf("failed to create client: %s", err.Error())
        }
+       defer hcl.Close()
 
        NUM_TEST_SPANS := 100
        allSpans := createRandomTestSpans(NUM_TEST_SPANS)
@@ -202,7 +213,8 @@ func TestDumpAll(t *testing.T) {
        if err != nil {
                t.Fatalf("WriteSpans failed: %s\n", err.Error())
        }
-       out := make(chan *common.Span, 50)
+       ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS))
+       out := make(chan *common.Span, NUM_TEST_SPANS)
        var dumpErr error
        go func() {
                dumpErr = hcl.DumpAll(3, out)
@@ -253,6 +265,7 @@ func TestClientGetServerConf(t *testing.T) {
        if err != nil {
                t.Fatalf("failed to create client: %s", err.Error())
        }
+       defer hcl.Close()
        serverCnf, err2 := hcl.GetServerConf()
        if err2 != nil {
                t.Fatalf("failed to call GetServerConf: %s", err2.Error())
@@ -293,7 +306,7 @@ func TestHrpcAdmissionsControl(t *testing.T) {
                Cnf: map[string]string{
                        conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", 
TEST_NUM_HRPC_HANDLERS),
                },
-               WrittenSpans: make(chan *common.Span, TEST_NUM_WRITESPANS),
+               WrittenSpans: common.NewSemaphore(0),
                HrpcTestHooks: testHooks,
        }
        ht, err := htraceBld.Build()
@@ -319,9 +332,7 @@ func TestHrpcAdmissionsControl(t *testing.T) {
                }(iter)
        }
        wg.Wait()
-       for i := 0; i < TEST_NUM_WRITESPANS; i++ {
-               <-ht.Store.WrittenSpans
-       }
+       ht.Store.WrittenSpans.Waits(int64(TEST_NUM_WRITESPANS))
 }
 
 // Tests that HRPC I/O timeouts work.

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fe19368a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index a4bb320..8cd1526 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -108,8 +108,8 @@ type shard struct {
        // A channel for incoming heartbeats
        heartbeats chan interface{}
 
-       // The channel we will send a bool to when we exit.
-       exited chan bool
+       // Tracks whether the shard goroutine has exited.
+       exited sync.WaitGroup
 
        // Per-address metrics
        mtxMap ServerSpanMetricsMap
@@ -123,7 +123,7 @@ func (shd *shard) processIncoming() {
        lg := shd.store.lg
        defer func() {
                lg.Infof("Shard processor for %s exiting.\n", shd.path)
-               shd.exited <- true
+               shd.exited.Done()
        }()
        for {
                select {
@@ -289,8 +289,7 @@ func (shd *shard) writeSpan(ispan *IncomingSpan) error {
        }
        shd.mtxMap.IncrementWritten(ispan.Addr, shd.maxMtx, shd.store.lg)
        if shd.store.WrittenSpans != nil {
-               shd.store.lg.Errorf("WATERMELON: Sending span to 
shd.store.WrittenSpans\n")
-               shd.store.WrittenSpans <- span
+               shd.store.WrittenSpans.Post()
        }
        return nil
 }
@@ -325,9 +324,7 @@ func (shd *shard) Close() {
        lg := shd.store.lg
        shd.incoming <- nil
        lg.Infof("Waiting for %s to exit...\n", shd.path)
-       if shd.exited != nil {
-               <-shd.exited
-       }
+       shd.exited.Wait()
        shd.ldb.Close()
        lg.Infof("Closed %s...\n", shd.path)
 }
@@ -345,8 +342,8 @@ type Reaper struct {
        // A channel used to send heartbeats to the reaper
        heartbeats chan interface{}
 
-       // A channel used to block until the reaper goroutine has exited.
-       exited chan interface{}
+       // Tracks whether the reaper goroutine has exited
+       exited sync.WaitGroup
 
        // The lock protecting reaper data.
        lock sync.Mutex
@@ -363,7 +360,6 @@ func NewReaper(cnf *conf.Config) *Reaper {
                lg:           common.NewLogger("reaper", cnf),
                spanExpiryMs: cnf.GetInt64(conf.HTRACE_SPAN_EXPIRY_MS),
                heartbeats:   make(chan interface{}, 1),
-               exited:       make(chan interface{}),
        }
        if rpr.spanExpiryMs >= MAX_SPAN_EXPIRY_MS {
                rpr.spanExpiryMs = MAX_SPAN_EXPIRY_MS
@@ -372,6 +368,7 @@ func NewReaper(cnf *conf.Config) *Reaper {
        }
        rpr.hb = NewHeartbeater("ReaperHeartbeater",
                cnf.GetInt64(conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS), rpr.lg)
+       rpr.exited.Add(1)
        go rpr.run()
        rpr.hb.AddHeartbeatTarget(&HeartbeatTarget{
                name:       "reaper",
@@ -390,7 +387,7 @@ func NewReaper(cnf *conf.Config) *Reaper {
 func (rpr *Reaper) run() {
        defer func() {
                rpr.lg.Info("Exiting Reaper goroutine.\n")
-               rpr.exited <- nil
+               rpr.exited.Done()
        }()
 
        for {
@@ -442,7 +439,6 @@ func (rpr *Reaper) SetReaperDate(rdate int64) {
 func (rpr *Reaper) Shutdown() {
        rpr.hb.Shutdown()
        close(rpr.heartbeats)
-       <-rpr.exited
 }
 
 // The Data Store.
@@ -458,9 +454,9 @@ type dataStore struct {
        // The write options to use for LevelDB.
        writeOpts *levigo.WriteOptions
 
-       // If non-null, a channel we will send spans to once we finish writing 
them.  This is only used
-       // for testing.
-       WrittenSpans chan *common.Span
+       // If non-null, a semaphore we will increment once for each span we 
receive.
+       // Used for testing.
+       WrittenSpans *common.Semaphore
 
        // The metrics sink.
        msink *MetricsSink
@@ -475,7 +471,7 @@ type dataStore struct {
        startMs int64
 }
 
-func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) 
(*dataStore, error) {
+func CreateDataStore(cnf *conf.Config, writtenSpans *common.Semaphore) 
(*dataStore, error) {
        // Get the configuration.
        clearStored := cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR)
        dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES)
@@ -513,10 +509,10 @@ func CreateDataStore(cnf *conf.Config, writtenSpans chan 
*common.Span) (*dataSto
        store.rpr = NewReaper(cnf)
        for idx := range store.shards {
                shd := store.shards[idx]
-               shd.exited = make(chan bool, 1)
                shd.heartbeats = make(chan interface{}, 1)
                shd.mtxMap = make(ServerSpanMetricsMap)
                shd.maxMtx = store.msink.maxMtx
+               shd.exited.Add(1)
                go shd.processIncoming()
        }
        store.hb = NewHeartbeater("DatastoreHeartbeater",

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fe19368a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
index 0443834..d9f4a0a 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -31,6 +31,7 @@ import (
        "sort"
        "strings"
        "testing"
+       "time"
 )
 
 // Test creating and tearing down a datastore.
@@ -74,14 +75,11 @@ var SIMPLE_TEST_SPANS []common.Span = []common.Span{
 func createSpans(spans []common.Span, store *dataStore) {
        for idx := range spans {
                store.WriteSpan(&IncomingSpan{
-                       Addr: "127.0.0.1:1234",
+                       Addr: "127.0.0.1",
                        Span: &spans[idx],
                })
        }
-       // Wait the spans to be created
-       for i := 0; i < len(spans); i++ {
-               <-store.WrittenSpans
-       }
+       store.WrittenSpans.Waits(int64(len(spans)))
 }
 
 // Test creating a datastore and adding some spans.
@@ -91,7 +89,8 @@ func TestDatastoreWriteAndRead(t *testing.T) {
                Cnf: map[string]string{
                        conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
                },
-               WrittenSpans: make(chan *common.Span, 100)}
+               WrittenSpans: common.NewSemaphore(0),
+       }
        ht, err := htraceBld.Build()
        if err != nil {
                panic(err)
@@ -99,12 +98,6 @@ func TestDatastoreWriteAndRead(t *testing.T) {
        defer ht.Close()
        createSpans(SIMPLE_TEST_SPANS, ht.Store)
 
-       waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-               "127.0.0.1:1234": &common.SpanMetrics{
-                       Written: uint64(len(SIMPLE_TEST_SPANS)),
-               },
-       })
-
        span := 
ht.Store.FindSpan(common.TestId("00000000000000000000000000000001"))
        if span == nil {
                t.Fatal()
@@ -160,7 +153,8 @@ func TestSimpleQuery(t *testing.T) {
                Cnf: map[string]string{
                        conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
                },
-               WrittenSpans: make(chan *common.Span, 100)}
+               WrittenSpans: common.NewSemaphore(0),
+       }
        ht, err := htraceBld.Build()
        if err != nil {
                panic(err)
@@ -168,7 +162,7 @@ func TestSimpleQuery(t *testing.T) {
        defer ht.Close()
        createSpans(SIMPLE_TEST_SPANS, ht.Store)
        waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-               "127.0.0.1:1234": &common.SpanMetrics{
+               "127.0.0.1": &common.SpanMetrics{
                        Written: uint64(len(SIMPLE_TEST_SPANS)),
                },
        })
@@ -191,7 +185,8 @@ func TestQueries2(t *testing.T) {
                Cnf: map[string]string{
                        conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
                },
-               WrittenSpans: make(chan *common.Span, 100)}
+               WrittenSpans: common.NewSemaphore(0),
+       }
        ht, err := htraceBld.Build()
        if err != nil {
                panic(err)
@@ -199,7 +194,7 @@ func TestQueries2(t *testing.T) {
        defer ht.Close()
        createSpans(SIMPLE_TEST_SPANS, ht.Store)
        waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-               "127.0.0.1:1234": &common.SpanMetrics{
+               "127.0.0.1": &common.SpanMetrics{
                        Written: uint64(len(SIMPLE_TEST_SPANS)),
                },
        })
@@ -248,7 +243,8 @@ func TestQueries3(t *testing.T) {
                Cnf: map[string]string{
                        conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
                },
-               WrittenSpans: make(chan *common.Span, 100)}
+               WrittenSpans: common.NewSemaphore(0),
+       }
        ht, err := htraceBld.Build()
        if err != nil {
                panic(err)
@@ -256,7 +252,7 @@ func TestQueries3(t *testing.T) {
        defer ht.Close()
        createSpans(SIMPLE_TEST_SPANS, ht.Store)
        waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-               "127.0.0.1:1234": &common.SpanMetrics{
+               "127.0.0.1": &common.SpanMetrics{
                        Written: uint64(len(SIMPLE_TEST_SPANS)),
                },
        })
@@ -305,18 +301,15 @@ func TestQueries4(t *testing.T) {
                Cnf: map[string]string{
                        conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
                },
-               WrittenSpans: make(chan *common.Span, 100)}
+               WrittenSpans: common.NewSemaphore(0),
+       }
        ht, err := htraceBld.Build()
        if err != nil {
                panic(err)
        }
        defer ht.Close()
        createSpans(SIMPLE_TEST_SPANS, ht.Store)
-       waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-               "127.0.0.1:1234": &common.SpanMetrics{
-                       Written: uint64(len(SIMPLE_TEST_SPANS)),
-               },
-       })
+
        testQuery(t, ht, &common.Query{
                Predicates: []common.Predicate{
                        common.Predicate{
@@ -355,13 +348,20 @@ func BenchmarkDatastoreWrites(b *testing.B) {
                        conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "15000",
                        conf.HTRACE_LOG_LEVEL: "INFO",
                },
-               WrittenSpans: make(chan *common.Span, b.N)}
+               WrittenSpans: common.NewSemaphore(0),
+       }
        ht, err := htraceBld.Build()
        if err != nil {
-               panic(err)
+               b.Fatalf("Error creating MiniHTraced: %s\n", err.Error())
        }
-       defer ht.Close()
-       rnd := rand.New(rand.NewSource(1))
+       ht.Store.lg.Infof("BenchmarkDatastoreWrites: b.N = %d\n", b.N)
+       defer func() {
+               if r := recover(); r != nil {
+                       ht.Store.lg.Infof("panic: %s\n", r.(error))
+               }
+               ht.Close()
+       }()
+       rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
        allSpans := make([]*common.Span, b.N)
        for n := range(allSpans) {
                allSpans[n] = test.NewRandomSpan(rnd, allSpans[0:n])
@@ -379,9 +379,7 @@ func BenchmarkDatastoreWrites(b *testing.B) {
                })
        }
        // Wait for all the spans to be written.
-       for n := 0; n < b.N; n++ {
-               <-ht.Store.WrittenSpans
-       }
+       ht.Store.WrittenSpans.Waits(int64(b.N))
        waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
                "127.0.0.1": &common.SpanMetrics{
                        Written: uint64(b.N), // should be less than?
@@ -394,7 +392,10 @@ func TestReloadDataStore(t *testing.T) {
                Cnf: map[string]string{
                        conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
                },
-               DataDirs: make([]string, 2), KeepDataDirsOnClose: true}
+               DataDirs: make([]string, 2),
+               KeepDataDirsOnClose: true,
+               WrittenSpans: common.NewSemaphore(0),
+       }
        ht, err := htraceBld.Build()
        if err != nil {
                t.Fatalf("failed to create datastore: %s", err.Error())
@@ -424,6 +425,7 @@ func TestReloadDataStore(t *testing.T) {
        if err != nil {
                t.Fatalf("WriteSpans failed: %s\n", err.Error())
        }
+       ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS))
 
        // Look up the spans we wrote.
        var span *common.Span
@@ -494,7 +496,8 @@ func TestQueriesWithContinuationTokens1(t *testing.T) {
                Cnf: map[string]string{
                        conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
                },
-               WrittenSpans: make(chan *common.Span, 100)}
+               WrittenSpans: common.NewSemaphore(0),
+       }
        ht, err := htraceBld.Build()
        if err != nil {
                panic(err)
@@ -502,7 +505,7 @@ func TestQueriesWithContinuationTokens1(t *testing.T) {
        defer ht.Close()
        createSpans(SIMPLE_TEST_SPANS, ht.Store)
        waitForMetrics(ht.Store.msink, common.SpanMetricsMap{
-               "127.0.0.1:1234": &common.SpanMetrics{
+               "127.0.0.1": &common.SpanMetrics{
                        Written: uint64(len(SIMPLE_TEST_SPANS)),
                },
        })

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fe19368a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
index 353beae..667a17b 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
@@ -53,8 +53,8 @@ type MiniHTracedBuilder struct {
        // If true, we will keep the data dirs around after MiniHTraced#Close
        KeepDataDirsOnClose bool
 
-       // If non-null, the WrittenSpans channel to use when creating the 
DataStore.
-       WrittenSpans chan *common.Span
+       // If non-null, the WrittenSpans semaphore to use when creating the 
DataStore.
+       WrittenSpans *common.Semaphore
 
        // The test hooks to use for the HRPC server
        HrpcTestHooks *hrpcTestHooks

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fe19368a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
index 7aef1d1..dcc916a 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go
@@ -45,7 +45,7 @@ func TestReapingOldSpans(t *testing.T) {
                        conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS:  "1",
                        conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1",
                },
-               WrittenSpans: make(chan *common.Span, NUM_TEST_SPANS),
+               WrittenSpans: common.NewSemaphore(0),
                DataDirs:     make([]string, 2),
        }
        ht, err := htraceBld.Build()
@@ -54,14 +54,12 @@ func TestReapingOldSpans(t *testing.T) {
        }
        for i := range testSpans {
                ht.Store.WriteSpan(&IncomingSpan{
-                       Addr: "127.0.0.1:1234",
+                       Addr: "127.0.0.1",
                        Span: testSpans[i],
                })
        }
        // Wait the spans to be created
-       for i := 0; i < len(testSpans); i++ {
-               <-ht.Store.WrittenSpans
-       }
+       ht.Store.WrittenSpans.Waits(NUM_TEST_SPANS)
        // Set a reaper date that will remove all the spans except final one.
        ht.Store.rpr.SetReaperDate(now)
 

Reply via email to