Repository: incubator-htrace Updated Branches: refs/heads/master c715e12eb -> fe19368a3
HTRACE-301. htraced: fix unit tests that aren't waiting for spans to be written, use semaphore for WrittenSpans (Colin Patrick McCabe via iwasakims) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/fe19368a Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/fe19368a Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/fe19368a Branch: refs/heads/master Commit: fe19368a3cb46595fc511a28ef5d0b7df26abb21 Parents: c715e12 Author: Masatake Iwasaki <[email protected]> Authored: Fri Nov 20 11:20:06 2015 +0900 Committer: Masatake Iwasaki <[email protected]> Committed: Fri Nov 20 11:20:06 2015 +0900 ---------------------------------------------------------------------- .../src/org/apache/htrace/common/semaphore.go | 78 ++++++++++++++++++ .../org/apache/htrace/common/semaphore_test.go | 86 ++++++++++++++++++++ .../org/apache/htrace/htraced/client_test.go | 25 ++++-- .../src/org/apache/htrace/htraced/datastore.go | 32 ++++---- .../org/apache/htrace/htraced/datastore_test.go | 71 ++++++++-------- .../org/apache/htrace/htraced/mini_htraced.go | 4 +- .../org/apache/htrace/htraced/reaper_test.go | 8 +- 7 files changed, 238 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fe19368a/htrace-htraced/go/src/org/apache/htrace/common/semaphore.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/common/semaphore.go b/htrace-htraced/go/src/org/apache/htrace/common/semaphore.go new file mode 100644 index 0000000..1d56ae9 --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/common/semaphore.go @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "sync" +) + +// A simple lock-and-condition-variable based semaphore implementation. +type Semaphore struct { + lock sync.Mutex + cond *sync.Cond + count int64 +} + +func NewSemaphore(count int64) *Semaphore { + sem := &Semaphore { + count:int64(count), + } + sem.cond = &sync.Cond { + L: &sem.lock, + } + return sem +} + +func (sem *Semaphore) Post() { + sem.lock.Lock() + sem.count++ + if sem.count > 0 { + sem.cond.Broadcast() + } + sem.lock.Unlock() +} + +func (sem *Semaphore) Posts(amt int64) { + sem.lock.Lock() + sem.count+=amt + if sem.count > 0 { + sem.cond.Broadcast() + } + sem.lock.Unlock() +} + +func (sem *Semaphore) Wait() { + sem.lock.Lock() + for { + if sem.count > 0 { + sem.count-- + sem.lock.Unlock() + return + } + sem.cond.Wait() + } +} + +func (sem *Semaphore) Waits(amt int64) { + var i int64 + for i=0; i<amt; i++ { + sem.Wait() + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fe19368a/htrace-htraced/go/src/org/apache/htrace/common/semaphore_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/common/semaphore_test.go b/htrace-htraced/go/src/org/apache/htrace/common/semaphore_test.go new file mode 100644 index 0000000..089c51b --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/common/semaphore_test.go @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "sync/atomic" + "testing" + "time" +) + +func TestSemaphoreWake(t *testing.T) { + var done uint32 + sem := NewSemaphore(0) + go func() { + time.Sleep(10 * time.Nanosecond) + atomic.AddUint32(&done, 1) + sem.Post() + }() + sem.Wait() + doneVal := atomic.LoadUint32(&done) + if doneVal != 1 { + t.Fatalf("sem.Wait did not wait for sem.Post") + } +} + +func TestSemaphoreCount(t *testing.T) { + sem := NewSemaphore(1) + sem.Post() + sem.Wait() + sem.Wait() + + sem = NewSemaphore(-1) + sem.Post() + sem.Post() + sem.Wait() +} + +func TestSemaphoreMultipleGoroutines(t *testing.T) { + var done uint32 + sem := NewSemaphore(0) + sem2 := NewSemaphore(0) + go func() { + sem.Wait() + atomic.AddUint32(&done, 1) + sem2.Post() + }() + go func() { + time.Sleep(10 * time.Nanosecond) + atomic.AddUint32(&done, 1) + sem.Post() + }() + go func() { + time.Sleep(20 * time.Nanosecond) + atomic.AddUint32(&done, 1) + sem.Post() + }() + sem.Wait() + go func() { + time.Sleep(10 * time.Nanosecond) + atomic.AddUint32(&done, 1) + sem.Post() + }() + sem.Wait() + sem2.Wait() + doneVal := atomic.LoadUint32(&done) + if doneVal != 4 { + t.Fatalf("sem.Wait did not wait for sem.Posts") + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fe19368a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go index e4f2151..fae871c 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go @@ -46,6 +46,7 @@ func TestClientGetServerVersion(t *testing.T) { if err != nil { t.Fatalf("failed to create client: %s", err.Error()) } + defer hcl.Close() _, err = hcl.GetServerVersion() if err != nil { t.Fatalf("failed to call GetServerVersion: %s", err.Error()) @@ -91,7 +92,9 @@ func createRandomTestSpans(amount int) common.SpanSlice { func TestClientOperations(t *testing.T) { htraceBld := &MiniHTracedBuilder{Name: "TestClientOperations", - DataDirs: make([]string, 2)} + DataDirs: make([]string, 2), + WrittenSpans: common.NewSemaphore(0), + } ht, err := htraceBld.Build() if err != nil { t.Fatalf("failed to create datastore: %s", err.Error()) @@ -102,6 +105,7 @@ func TestClientOperations(t *testing.T) { if err != nil { t.Fatalf("failed to create client: %s", err.Error()) } + defer hcl.Close() // Create some random trace spans. NUM_TEST_SPANS := 30 @@ -115,6 +119,7 @@ func TestClientOperations(t *testing.T) { t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2, err.Error()) } + ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS/2)) // Look up the first half of the spans. They should be found. var span *common.Span @@ -181,7 +186,12 @@ func TestClientOperations(t *testing.T) { func TestDumpAll(t *testing.T) { htraceBld := &MiniHTracedBuilder{Name: "TestDumpAll", - DataDirs: make([]string, 2)} + DataDirs: make([]string, 2), + WrittenSpans: common.NewSemaphore(0), + Cnf: map[string]string{ + conf.HTRACE_LOG_LEVEL: "INFO", + }, + } ht, err := htraceBld.Build() if err != nil { t.Fatalf("failed to create datastore: %s", err.Error()) @@ -192,6 +202,7 @@ func TestDumpAll(t *testing.T) { if err != nil { t.Fatalf("failed to create client: %s", err.Error()) } + defer hcl.Close() NUM_TEST_SPANS := 100 allSpans := createRandomTestSpans(NUM_TEST_SPANS) @@ -202,7 +213,8 @@ func TestDumpAll(t *testing.T) { if err != nil { t.Fatalf("WriteSpans failed: %s\n", err.Error()) } - out := make(chan *common.Span, 50) + ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS)) + out := make(chan *common.Span, NUM_TEST_SPANS) var dumpErr error go func() { dumpErr = hcl.DumpAll(3, out) @@ -253,6 +265,7 @@ func TestClientGetServerConf(t *testing.T) { if err != nil { t.Fatalf("failed to create client: %s", err.Error()) } + defer hcl.Close() serverCnf, err2 := hcl.GetServerConf() if err2 != nil { t.Fatalf("failed to call GetServerConf: %s", err2.Error()) @@ -293,7 +306,7 @@ func TestHrpcAdmissionsControl(t *testing.T) { Cnf: map[string]string{ conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", TEST_NUM_HRPC_HANDLERS), }, - WrittenSpans: make(chan *common.Span, TEST_NUM_WRITESPANS), + WrittenSpans: common.NewSemaphore(0), HrpcTestHooks: testHooks, } ht, err := htraceBld.Build() @@ -319,9 +332,7 @@ func TestHrpcAdmissionsControl(t *testing.T) { }(iter) } wg.Wait() - for i := 0; i < TEST_NUM_WRITESPANS; i++ { - <-ht.Store.WrittenSpans - } + ht.Store.WrittenSpans.Waits(int64(TEST_NUM_WRITESPANS)) } // Tests that HRPC I/O timeouts work. http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fe19368a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go index a4bb320..8cd1526 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go @@ -108,8 +108,8 @@ type shard struct { // A channel for incoming heartbeats heartbeats chan interface{} - // The channel we will send a bool to when we exit. - exited chan bool + // Tracks whether the shard goroutine has exited. + exited sync.WaitGroup // Per-address metrics mtxMap ServerSpanMetricsMap @@ -123,7 +123,7 @@ func (shd *shard) processIncoming() { lg := shd.store.lg defer func() { lg.Infof("Shard processor for %s exiting.\n", shd.path) - shd.exited <- true + shd.exited.Done() }() for { select { @@ -289,8 +289,7 @@ func (shd *shard) writeSpan(ispan *IncomingSpan) error { } shd.mtxMap.IncrementWritten(ispan.Addr, shd.maxMtx, shd.store.lg) if shd.store.WrittenSpans != nil { - shd.store.lg.Errorf("WATERMELON: Sending span to shd.store.WrittenSpans\n") - shd.store.WrittenSpans <- span + shd.store.WrittenSpans.Post() } return nil } @@ -325,9 +324,7 @@ func (shd *shard) Close() { lg := shd.store.lg shd.incoming <- nil lg.Infof("Waiting for %s to exit...\n", shd.path) - if shd.exited != nil { - <-shd.exited - } + shd.exited.Wait() shd.ldb.Close() lg.Infof("Closed %s...\n", shd.path) } @@ -345,8 +342,8 @@ type Reaper struct { // A channel used to send heartbeats to the reaper heartbeats chan interface{} - // A channel used to block until the reaper goroutine has exited. - exited chan interface{} + // Tracks whether the reaper goroutine has exited + exited sync.WaitGroup // The lock protecting reaper data. lock sync.Mutex @@ -363,7 +360,6 @@ func NewReaper(cnf *conf.Config) *Reaper { lg: common.NewLogger("reaper", cnf), spanExpiryMs: cnf.GetInt64(conf.HTRACE_SPAN_EXPIRY_MS), heartbeats: make(chan interface{}, 1), - exited: make(chan interface{}), } if rpr.spanExpiryMs >= MAX_SPAN_EXPIRY_MS { rpr.spanExpiryMs = MAX_SPAN_EXPIRY_MS @@ -372,6 +368,7 @@ func NewReaper(cnf *conf.Config) *Reaper { } rpr.hb = NewHeartbeater("ReaperHeartbeater", cnf.GetInt64(conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS), rpr.lg) + rpr.exited.Add(1) go rpr.run() rpr.hb.AddHeartbeatTarget(&HeartbeatTarget{ name: "reaper", @@ -390,7 +387,7 @@ func NewReaper(cnf *conf.Config) *Reaper { func (rpr *Reaper) run() { defer func() { rpr.lg.Info("Exiting Reaper goroutine.\n") - rpr.exited <- nil + rpr.exited.Done() }() for { @@ -442,7 +439,6 @@ func (rpr *Reaper) SetReaperDate(rdate int64) { func (rpr *Reaper) Shutdown() { rpr.hb.Shutdown() close(rpr.heartbeats) - <-rpr.exited } // The Data Store. @@ -458,9 +454,9 @@ type dataStore struct { // The write options to use for LevelDB. writeOpts *levigo.WriteOptions - // If non-null, a channel we will send spans to once we finish writing them. This is only used - // for testing. - WrittenSpans chan *common.Span + // If non-null, a semaphore we will increment once for each span we receive. + // Used for testing. + WrittenSpans *common.Semaphore // The metrics sink. msink *MetricsSink @@ -475,7 +471,7 @@ type dataStore struct { startMs int64 } -func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataStore, error) { +func CreateDataStore(cnf *conf.Config, writtenSpans *common.Semaphore) (*dataStore, error) { // Get the configuration. clearStored := cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR) dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES) @@ -513,10 +509,10 @@ func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataSto store.rpr = NewReaper(cnf) for idx := range store.shards { shd := store.shards[idx] - shd.exited = make(chan bool, 1) shd.heartbeats = make(chan interface{}, 1) shd.mtxMap = make(ServerSpanMetricsMap) shd.maxMtx = store.msink.maxMtx + shd.exited.Add(1) go shd.processIncoming() } store.hb = NewHeartbeater("DatastoreHeartbeater", http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fe19368a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go index 0443834..d9f4a0a 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go @@ -31,6 +31,7 @@ import ( "sort" "strings" "testing" + "time" ) // Test creating and tearing down a datastore. @@ -74,14 +75,11 @@ var SIMPLE_TEST_SPANS []common.Span = []common.Span{ func createSpans(spans []common.Span, store *dataStore) { for idx := range spans { store.WriteSpan(&IncomingSpan{ - Addr: "127.0.0.1:1234", + Addr: "127.0.0.1", Span: &spans[idx], }) } - // Wait the spans to be created - for i := 0; i < len(spans); i++ { - <-store.WrittenSpans - } + store.WrittenSpans.Waits(int64(len(spans))) } // Test creating a datastore and adding some spans. @@ -91,7 +89,8 @@ func TestDatastoreWriteAndRead(t *testing.T) { Cnf: map[string]string{ conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", }, - WrittenSpans: make(chan *common.Span, 100)} + WrittenSpans: common.NewSemaphore(0), + } ht, err := htraceBld.Build() if err != nil { panic(err) @@ -99,12 +98,6 @@ func TestDatastoreWriteAndRead(t *testing.T) { defer ht.Close() createSpans(SIMPLE_TEST_SPANS, ht.Store) - waitForMetrics(ht.Store.msink, common.SpanMetricsMap{ - "127.0.0.1:1234": &common.SpanMetrics{ - Written: uint64(len(SIMPLE_TEST_SPANS)), - }, - }) - span := ht.Store.FindSpan(common.TestId("00000000000000000000000000000001")) if span == nil { t.Fatal() @@ -160,7 +153,8 @@ func TestSimpleQuery(t *testing.T) { Cnf: map[string]string{ conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", }, - WrittenSpans: make(chan *common.Span, 100)} + WrittenSpans: common.NewSemaphore(0), + } ht, err := htraceBld.Build() if err != nil { panic(err) @@ -168,7 +162,7 @@ func TestSimpleQuery(t *testing.T) { defer ht.Close() createSpans(SIMPLE_TEST_SPANS, ht.Store) waitForMetrics(ht.Store.msink, common.SpanMetricsMap{ - "127.0.0.1:1234": &common.SpanMetrics{ + "127.0.0.1": &common.SpanMetrics{ Written: uint64(len(SIMPLE_TEST_SPANS)), }, }) @@ -191,7 +185,8 @@ func TestQueries2(t *testing.T) { Cnf: map[string]string{ conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", }, - WrittenSpans: make(chan *common.Span, 100)} + WrittenSpans: common.NewSemaphore(0), + } ht, err := htraceBld.Build() if err != nil { panic(err) @@ -199,7 +194,7 @@ func TestQueries2(t *testing.T) { defer ht.Close() createSpans(SIMPLE_TEST_SPANS, ht.Store) waitForMetrics(ht.Store.msink, common.SpanMetricsMap{ - "127.0.0.1:1234": &common.SpanMetrics{ + "127.0.0.1": &common.SpanMetrics{ Written: uint64(len(SIMPLE_TEST_SPANS)), }, }) @@ -248,7 +243,8 @@ func TestQueries3(t *testing.T) { Cnf: map[string]string{ conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", }, - WrittenSpans: make(chan *common.Span, 100)} + WrittenSpans: common.NewSemaphore(0), + } ht, err := htraceBld.Build() if err != nil { panic(err) @@ -256,7 +252,7 @@ func TestQueries3(t *testing.T) { defer ht.Close() createSpans(SIMPLE_TEST_SPANS, ht.Store) waitForMetrics(ht.Store.msink, common.SpanMetricsMap{ - "127.0.0.1:1234": &common.SpanMetrics{ + "127.0.0.1": &common.SpanMetrics{ Written: uint64(len(SIMPLE_TEST_SPANS)), }, }) @@ -305,18 +301,15 @@ func TestQueries4(t *testing.T) { Cnf: map[string]string{ conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", }, - WrittenSpans: make(chan *common.Span, 100)} + WrittenSpans: common.NewSemaphore(0), + } ht, err := htraceBld.Build() if err != nil { panic(err) } defer ht.Close() createSpans(SIMPLE_TEST_SPANS, ht.Store) - waitForMetrics(ht.Store.msink, common.SpanMetricsMap{ - "127.0.0.1:1234": &common.SpanMetrics{ - Written: uint64(len(SIMPLE_TEST_SPANS)), - }, - }) + testQuery(t, ht, &common.Query{ Predicates: []common.Predicate{ common.Predicate{ @@ -355,13 +348,20 @@ func BenchmarkDatastoreWrites(b *testing.B) { conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "15000", conf.HTRACE_LOG_LEVEL: "INFO", }, - WrittenSpans: make(chan *common.Span, b.N)} + WrittenSpans: common.NewSemaphore(0), + } ht, err := htraceBld.Build() if err != nil { - panic(err) + b.Fatalf("Error creating MiniHTraced: %s\n", err.Error()) } - defer ht.Close() - rnd := rand.New(rand.NewSource(1)) + ht.Store.lg.Infof("BenchmarkDatastoreWrites: b.N = %d\n", b.N) + defer func() { + if r := recover(); r != nil { + ht.Store.lg.Infof("panic: %s\n", r.(error)) + } + ht.Close() + }() + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) allSpans := make([]*common.Span, b.N) for n := range(allSpans) { allSpans[n] = test.NewRandomSpan(rnd, allSpans[0:n]) @@ -379,9 +379,7 @@ func BenchmarkDatastoreWrites(b *testing.B) { }) } // Wait for all the spans to be written. - for n := 0; n < b.N; n++ { - <-ht.Store.WrittenSpans - } + ht.Store.WrittenSpans.Waits(int64(b.N)) waitForMetrics(ht.Store.msink, common.SpanMetricsMap{ "127.0.0.1": &common.SpanMetrics{ Written: uint64(b.N), // should be less than? @@ -394,7 +392,10 @@ func TestReloadDataStore(t *testing.T) { Cnf: map[string]string{ conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", }, - DataDirs: make([]string, 2), KeepDataDirsOnClose: true} + DataDirs: make([]string, 2), + KeepDataDirsOnClose: true, + WrittenSpans: common.NewSemaphore(0), + } ht, err := htraceBld.Build() if err != nil { t.Fatalf("failed to create datastore: %s", err.Error()) @@ -424,6 +425,7 @@ func TestReloadDataStore(t *testing.T) { if err != nil { t.Fatalf("WriteSpans failed: %s\n", err.Error()) } + ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS)) // Look up the spans we wrote. var span *common.Span @@ -494,7 +496,8 @@ func TestQueriesWithContinuationTokens1(t *testing.T) { Cnf: map[string]string{ conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", }, - WrittenSpans: make(chan *common.Span, 100)} + WrittenSpans: common.NewSemaphore(0), + } ht, err := htraceBld.Build() if err != nil { panic(err) @@ -502,7 +505,7 @@ func TestQueriesWithContinuationTokens1(t *testing.T) { defer ht.Close() createSpans(SIMPLE_TEST_SPANS, ht.Store) waitForMetrics(ht.Store.msink, common.SpanMetricsMap{ - "127.0.0.1:1234": &common.SpanMetrics{ + "127.0.0.1": &common.SpanMetrics{ Written: uint64(len(SIMPLE_TEST_SPANS)), }, }) http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fe19368a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go index 353beae..667a17b 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go @@ -53,8 +53,8 @@ type MiniHTracedBuilder struct { // If true, we will keep the data dirs around after MiniHTraced#Close KeepDataDirsOnClose bool - // If non-null, the WrittenSpans channel to use when creating the DataStore. - WrittenSpans chan *common.Span + // If non-null, the WrittenSpans semaphore to use when creating the DataStore. + WrittenSpans *common.Semaphore // The test hooks to use for the HRPC server HrpcTestHooks *hrpcTestHooks http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fe19368a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go index 7aef1d1..dcc916a 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go @@ -45,7 +45,7 @@ func TestReapingOldSpans(t *testing.T) { conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS: "1", conf.HTRACE_METRICS_HEARTBEAT_PERIOD_MS: "1", }, - WrittenSpans: make(chan *common.Span, NUM_TEST_SPANS), + WrittenSpans: common.NewSemaphore(0), DataDirs: make([]string, 2), } ht, err := htraceBld.Build() @@ -54,14 +54,12 @@ func TestReapingOldSpans(t *testing.T) { } for i := range testSpans { ht.Store.WriteSpan(&IncomingSpan{ - Addr: "127.0.0.1:1234", + Addr: "127.0.0.1", Span: testSpans[i], }) } // Wait the spans to be created - for i := 0; i < len(testSpans); i++ { - <-ht.Store.WrittenSpans - } + ht.Store.WrittenSpans.Waits(NUM_TEST_SPANS) // Set a reaper date that will remove all the spans except final one. ht.Store.rpr.SetReaperDate(now)
