http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go deleted file mode 100644 index 281ee2d..0000000 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go +++ /dev/null @@ -1,761 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "bytes" - "encoding/json" - "math/rand" - htrace "org/apache/htrace/client" - "org/apache/htrace/common" - "org/apache/htrace/conf" - "org/apache/htrace/test" - "os" - "reflect" - "sort" - "testing" - "time" -) - -// Test creating and tearing down a datastore. -func TestCreateDatastore(t *testing.T) { - htraceBld := &MiniHTracedBuilder{Name: "TestCreateDatastore", - DataDirs: make([]string, 3)} - ht, err := htraceBld.Build() - if err != nil { - t.Fatalf("failed to create datastore: %s", err.Error()) - } - defer ht.Close() -} - -var SIMPLE_TEST_SPANS []common.Span = []common.Span{ - common.Span{Id: common.TestId("00000000000000000000000000000001"), - SpanData: common.SpanData{ - Begin: 123, - End: 456, - Description: "getFileDescriptors", - Parents: []common.SpanId{}, - TracerId: "firstd", - }}, - common.Span{Id: common.TestId("00000000000000000000000000000002"), - SpanData: common.SpanData{ - Begin: 125, - End: 200, - Description: "openFd", - Parents: []common.SpanId{common.TestId("00000000000000000000000000000001")}, - TracerId: "secondd", - }}, - common.Span{Id: common.TestId("00000000000000000000000000000003"), - SpanData: common.SpanData{ - Begin: 200, - End: 456, - Description: "passFd", - Parents: []common.SpanId{common.TestId("00000000000000000000000000000001")}, - TracerId: "thirdd", - }}, -} - -func createSpans(spans []common.Span, store *dataStore) { - ing := store.NewSpanIngestor(store.lg, "127.0.0.1", "") - for idx := range spans { - ing.IngestSpan(&spans[idx]) - } - ing.Close(time.Now()) - store.WrittenSpans.Waits(int64(len(spans))) -} - -// Test creating a datastore and adding some spans. -func TestDatastoreWriteAndRead(t *testing.T) { - t.Parallel() - htraceBld := &MiniHTracedBuilder{Name: "TestDatastoreWriteAndRead", - Cnf: map[string]string{ - conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", - }, - WrittenSpans: common.NewSemaphore(0), - } - ht, err := htraceBld.Build() - if err != nil { - panic(err) - } - defer ht.Close() - createSpans(SIMPLE_TEST_SPANS, ht.Store) - - span := ht.Store.FindSpan(common.TestId("00000000000000000000000000000001")) - if span == nil { - t.Fatal() - } - if !span.Id.Equal(common.TestId("00000000000000000000000000000001")) { - t.Fatal() - } - common.ExpectSpansEqual(t, &SIMPLE_TEST_SPANS[0], span) - children := ht.Store.FindChildren(common.TestId("00000000000000000000000000000001"), 1) - if len(children) != 1 { - t.Fatalf("expected 1 child, but got %d\n", len(children)) - } - children = ht.Store.FindChildren(common.TestId("00000000000000000000000000000001"), 2) - if len(children) != 2 { - t.Fatalf("expected 2 children, but got %d\n", len(children)) - } - sort.Sort(common.SpanIdSlice(children)) - if !children[0].Equal(common.TestId("00000000000000000000000000000002")) { - t.Fatal() - } - if !children[1].Equal(common.TestId("00000000000000000000000000000003")) { - t.Fatal() - } -} - -func testQuery(t *testing.T, ht *MiniHTraced, query *common.Query, - expectedSpans []common.Span) { - testQueryExt(t, ht, query, expectedSpans, nil) -} - -func testQueryExt(t *testing.T, ht *MiniHTraced, query *common.Query, - expectedSpans []common.Span, expectedNumScanned []int) { - spans, err, numScanned := ht.Store.HandleQuery(query) - if err != nil { - t.Fatalf("Query %s failed: %s\n", query.String(), err.Error()) - } - expectedBuf := new(bytes.Buffer) - dec := json.NewEncoder(expectedBuf) - err = dec.Encode(expectedSpans) - if err != nil { - t.Fatalf("Failed to encode expectedSpans to JSON: %s\n", err.Error()) - } - spansBuf := new(bytes.Buffer) - dec = json.NewEncoder(spansBuf) - err = dec.Encode(spans) - if err != nil { - t.Fatalf("Failed to encode result spans to JSON: %s\n", err.Error()) - } - t.Logf("len(spans) = %d, len(expectedSpans) = %d\n", len(spans), - len(expectedSpans)) - common.ExpectStrEqual(t, string(expectedBuf.Bytes()), string(spansBuf.Bytes())) - if expectedNumScanned != nil { - if !reflect.DeepEqual(expectedNumScanned, numScanned) { - t.Fatalf("Invalid values for numScanned: got %v, expected %v\n", - expectedNumScanned, numScanned) - } - } -} - -// Test queries on the datastore. -func TestSimpleQuery(t *testing.T) { - t.Parallel() - htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery", - Cnf: map[string]string{ - conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", - }, - WrittenSpans: common.NewSemaphore(0), - } - ht, err := htraceBld.Build() - if err != nil { - panic(err) - } - defer ht.Close() - createSpans(SIMPLE_TEST_SPANS, ht.Store) - - assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS)) - - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.GREATER_THAN_OR_EQUALS, - Field: common.BEGIN_TIME, - Val: "125", - }, - }, - Lim: 5, - }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]}) -} - -func TestQueries2(t *testing.T) { - t.Parallel() - htraceBld := &MiniHTracedBuilder{Name: "TestQueries2", - Cnf: map[string]string{ - conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", - }, - WrittenSpans: common.NewSemaphore(0), - } - ht, err := htraceBld.Build() - if err != nil { - panic(err) - } - defer ht.Close() - createSpans(SIMPLE_TEST_SPANS, ht.Store) - assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS)) - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.LESS_THAN_OR_EQUALS, - Field: common.BEGIN_TIME, - Val: "125", - }, - }, - Lim: 5, - }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]}) - - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.LESS_THAN_OR_EQUALS, - Field: common.BEGIN_TIME, - Val: "125", - }, - common.Predicate{ - Op: common.EQUALS, - Field: common.DESCRIPTION, - Val: "getFileDescriptors", - }, - }, - Lim: 2, - }, []common.Span{SIMPLE_TEST_SPANS[0]}) - - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.EQUALS, - Field: common.DESCRIPTION, - Val: "getFileDescriptors", - }, - }, - Lim: 2, - }, []common.Span{SIMPLE_TEST_SPANS[0]}) -} - -func TestQueries3(t *testing.T) { - t.Parallel() - htraceBld := &MiniHTracedBuilder{Name: "TestQueries3", - Cnf: map[string]string{ - conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", - }, - WrittenSpans: common.NewSemaphore(0), - } - ht, err := htraceBld.Build() - if err != nil { - panic(err) - } - defer ht.Close() - createSpans(SIMPLE_TEST_SPANS, ht.Store) - assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS)) - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.CONTAINS, - Field: common.DESCRIPTION, - Val: "Fd", - }, - common.Predicate{ - Op: common.GREATER_THAN_OR_EQUALS, - Field: common.BEGIN_TIME, - Val: "100", - }, - }, - Lim: 5, - }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]}) - - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.LESS_THAN_OR_EQUALS, - Field: common.SPAN_ID, - Val: common.TestId("00000000000000000000000000000000").String(), - }, - }, - Lim: 200, - }, []common.Span{}) - - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.LESS_THAN_OR_EQUALS, - Field: common.SPAN_ID, - Val: common.TestId("00000000000000000000000000000002").String(), - }, - }, - Lim: 200, - }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]}) -} - -func TestQueries4(t *testing.T) { - t.Parallel() - htraceBld := &MiniHTracedBuilder{Name: "TestQueries4", - Cnf: map[string]string{ - conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", - }, - WrittenSpans: common.NewSemaphore(0), - } - ht, err := htraceBld.Build() - if err != nil { - panic(err) - } - defer ht.Close() - createSpans(SIMPLE_TEST_SPANS, ht.Store) - - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.GREATER_THAN, - Field: common.BEGIN_TIME, - Val: "125", - }, - }, - Lim: 5, - }, []common.Span{SIMPLE_TEST_SPANS[2]}) - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.GREATER_THAN_OR_EQUALS, - Field: common.DESCRIPTION, - Val: "openFd", - }, - }, - Lim: 2, - }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]}) - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.GREATER_THAN, - Field: common.DESCRIPTION, - Val: "openFd", - }, - }, - Lim: 2, - }, []common.Span{SIMPLE_TEST_SPANS[2]}) -} - -var TEST_QUERIES5_SPANS []common.Span = []common.Span{ - common.Span{Id: common.TestId("10000000000000000000000000000001"), - SpanData: common.SpanData{ - Begin: 123, - End: 456, - Description: "span1", - Parents: []common.SpanId{}, - TracerId: "myTracer", - }}, - common.Span{Id: common.TestId("10000000000000000000000000000002"), - SpanData: common.SpanData{ - Begin: 123, - End: 200, - Description: "span2", - Parents: []common.SpanId{common.TestId("10000000000000000000000000000001")}, - TracerId: "myTracer", - }}, - common.Span{Id: common.TestId("10000000000000000000000000000003"), - SpanData: common.SpanData{ - Begin: 124, - End: 457, - Description: "span3", - Parents: []common.SpanId{common.TestId("10000000000000000000000000000001")}, - TracerId: "myTracer", - }}, -} - -func TestQueries5(t *testing.T) { - t.Parallel() - htraceBld := &MiniHTracedBuilder{Name: "TestQueries5", - WrittenSpans: common.NewSemaphore(0), - DataDirs: make([]string, 1), - } - ht, err := htraceBld.Build() - if err != nil { - panic(err) - } - defer ht.Close() - createSpans(TEST_QUERIES5_SPANS, ht.Store) - - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.GREATER_THAN, - Field: common.BEGIN_TIME, - Val: "123", - }, - }, - Lim: 5, - }, []common.Span{TEST_QUERIES5_SPANS[2]}) - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.GREATER_THAN, - Field: common.END_TIME, - Val: "200", - }, - }, - Lim: 500, - }, []common.Span{TEST_QUERIES5_SPANS[0], TEST_QUERIES5_SPANS[2]}) - - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.LESS_THAN_OR_EQUALS, - Field: common.END_TIME, - Val: "999", - }, - }, - Lim: 500, - }, []common.Span{TEST_QUERIES5_SPANS[2], - TEST_QUERIES5_SPANS[0], - TEST_QUERIES5_SPANS[1], - }) -} - -func BenchmarkDatastoreWrites(b *testing.B) { - htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites", - Cnf: map[string]string{ - conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", - conf.HTRACE_LOG_LEVEL: "INFO", - }, - WrittenSpans: common.NewSemaphore(0), - } - ht, err := htraceBld.Build() - if err != nil { - b.Fatalf("Error creating MiniHTraced: %s\n", err.Error()) - } - ht.Store.lg.Infof("BenchmarkDatastoreWrites: b.N = %d\n", b.N) - defer func() { - if r := recover(); r != nil { - ht.Store.lg.Infof("panic: %s\n", r.(error)) - } - ht.Close() - }() - rnd := rand.New(rand.NewSource(time.Now().UnixNano())) - allSpans := make([]*common.Span, b.N) - for n := range allSpans { - allSpans[n] = test.NewRandomSpan(rnd, allSpans[0:n]) - } - - // Reset the timer to avoid including the time required to create new - // random spans in the benchmark total. - b.ResetTimer() - - // Write many random spans. - ing := ht.Store.NewSpanIngestor(ht.Store.lg, "127.0.0.1", "") - for n := 0; n < b.N; n++ { - ing.IngestSpan(allSpans[n]) - } - ing.Close(time.Now()) - // Wait for all the spans to be written. - ht.Store.WrittenSpans.Waits(int64(b.N)) - assertNumWrittenEquals(b, ht.Store.msink, b.N) -} - -func verifySuccessfulLoad(t *testing.T, allSpans common.SpanSlice, - dataDirs []string) { - htraceBld := &MiniHTracedBuilder{ - Name: "TestReloadDataStore#verifySuccessfulLoad", - DataDirs: dataDirs, - KeepDataDirsOnClose: true, - } - ht, err := htraceBld.Build() - if err != nil { - t.Fatalf("failed to create datastore: %s", err.Error()) - } - defer ht.Close() - var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf(), nil) - if err != nil { - t.Fatalf("failed to create client: %s", err.Error()) - } - defer hcl.Close() - for i := 0; i < len(allSpans); i++ { - span, err := hcl.FindSpan(allSpans[i].Id) - if err != nil { - t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) - } - common.ExpectSpansEqual(t, allSpans[i], span) - } - // Look up the spans we wrote. - var span *common.Span - for i := 0; i < len(allSpans); i++ { - span, err = hcl.FindSpan(allSpans[i].Id) - if err != nil { - t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) - } - common.ExpectSpansEqual(t, allSpans[i], span) - } -} - -func verifyFailedLoad(t *testing.T, dataDirs []string, expectedErr string) { - htraceBld := &MiniHTracedBuilder{ - Name: "TestReloadDataStore#verifyFailedLoad", - DataDirs: dataDirs, - KeepDataDirsOnClose: true, - } - _, err := htraceBld.Build() - if err == nil { - t.Fatalf("expected failure to load, but the load succeeded.") - } - common.AssertErrContains(t, err, expectedErr) -} - -func TestReloadDataStore(t *testing.T) { - htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore", - Cnf: map[string]string{ - conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", - }, - DataDirs: make([]string, 2), - KeepDataDirsOnClose: true, - WrittenSpans: common.NewSemaphore(0), - } - ht, err := htraceBld.Build() - if err != nil { - t.Fatalf("failed to create datastore: %s", err.Error()) - } - dataDirs := make([]string, len(ht.DataDirs)) - copy(dataDirs, ht.DataDirs) - defer func() { - if ht != nil { - ht.Close() - } - for i := range dataDirs { - os.RemoveAll(dataDirs[i]) - } - }() - var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf(), nil) - if err != nil { - t.Fatalf("failed to create client: %s", err.Error()) - } - hcnf := ht.Cnf.Clone() - - // Create some random trace spans. - NUM_TEST_SPANS := 5 - allSpans := createRandomTestSpans(NUM_TEST_SPANS) - err = hcl.WriteSpans(allSpans) - if err != nil { - t.Fatalf("WriteSpans failed: %s\n", err.Error()) - } - ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS)) - - // Look up the spans we wrote. - var span *common.Span - for i := 0; i < NUM_TEST_SPANS; i++ { - span, err = hcl.FindSpan(allSpans[i].Id) - if err != nil { - t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) - } - common.ExpectSpansEqual(t, allSpans[i], span) - } - hcl.Close() - ht.Close() - ht = nil - - // Verify that we can reload the datastore, even if we configure the data - // directories in a different order. - verifySuccessfulLoad(t, allSpans, []string{dataDirs[1], dataDirs[0]}) - - // If we try to reload the datastore with only one directory, it won't work - // (we need both). - verifyFailedLoad(t, []string{dataDirs[1]}, - "The TotalShards field of all shards is 2, but we have 1 shards.") - - // Test that we give an intelligent error message when 0 directories are - // configured. - verifyFailedLoad(t, []string{}, "No shard directories found.") - - // Can't specify the same directory more than once... will get "lock - // already held by process" - verifyFailedLoad(t, []string{dataDirs[0], dataDirs[1], dataDirs[1]}, - " already held by process.") - - // Open the datastore and modify it to have the wrong DaemonId - dld := NewDataStoreLoader(hcnf) - defer func() { - if dld != nil { - dld.Close() - dld = nil - } - }() - dld.LoadShards() - sinfo, err := dld.shards[0].readShardInfo() - if err != nil { - t.Fatalf("error reading shard info for shard %s: %s\n", - dld.shards[0].path, err.Error()) - } - newDaemonId := sinfo.DaemonId + 1 - dld.lg.Infof("Read %s from shard %s. Changing daemonId to 0x%016x\n.", - asJson(sinfo), dld.shards[0].path, newDaemonId) - sinfo.DaemonId = newDaemonId - err = dld.shards[0].writeShardInfo(sinfo) - if err != nil { - t.Fatalf("error writing shard info for shard %s: %s\n", - dld.shards[0].path, err.Error()) - } - dld.Close() - dld = nil - verifyFailedLoad(t, dataDirs, "DaemonId mismatch.") - - // Open the datastore and modify it to have the wrong TotalShards - dld = NewDataStoreLoader(hcnf) - dld.LoadShards() - sinfo, err = dld.shards[0].readShardInfo() - if err != nil { - t.Fatalf("error reading shard info for shard %s: %s\n", - dld.shards[0].path, err.Error()) - } - newDaemonId = sinfo.DaemonId - 1 - dld.lg.Infof("Read %s from shard %s. Changing daemonId to 0x%016x, " + - "TotalShards to 3\n.", - asJson(sinfo), dld.shards[0].path, newDaemonId) - sinfo.DaemonId = newDaemonId - sinfo.TotalShards = 3 - err = dld.shards[0].writeShardInfo(sinfo) - if err != nil { - t.Fatalf("error writing shard info for shard %s: %s\n", - dld.shards[0].path, err.Error()) - } - dld.Close() - dld = nil - verifyFailedLoad(t, dataDirs, "TotalShards mismatch.") - - // Open the datastore and modify it to have the wrong LayoutVersion - dld = NewDataStoreLoader(hcnf) - dld.LoadShards() - for shardIdx := range(dld.shards) { - sinfo, err = dld.shards[shardIdx].readShardInfo() - if err != nil { - t.Fatalf("error reading shard info for shard %s: %s\n", - dld.shards[shardIdx].path, err.Error()) - } - dld.lg.Infof("Read %s from shard %s. Changing TotalShards to 2, " + - "LayoutVersion to 2\n", asJson(sinfo), dld.shards[shardIdx].path) - sinfo.TotalShards = 2 - sinfo.LayoutVersion = 2 - err = dld.shards[shardIdx].writeShardInfo(sinfo) - if err != nil { - t.Fatalf("error writing shard info for shard %s: %s\n", - dld.shards[0].path, err.Error()) - } - } - dld.Close() - dld = nil - verifyFailedLoad(t, dataDirs, "The layout version of all shards is 2, " + - "but we only support") - - // It should work with data.store.clear set. - htraceBld = &MiniHTracedBuilder{ - Name: "TestReloadDataStore#clear", - DataDirs: dataDirs, - KeepDataDirsOnClose: true, - Cnf: map[string]string{conf.HTRACE_DATA_STORE_CLEAR: "true"}, - } - ht, err = htraceBld.Build() - if err != nil { - t.Fatalf("failed to create datastore: %s", err.Error()) - } -} - -func TestQueriesWithContinuationTokens1(t *testing.T) { - t.Parallel() - htraceBld := &MiniHTracedBuilder{Name: "TestQueriesWithContinuationTokens1", - Cnf: map[string]string{ - conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", - }, - WrittenSpans: common.NewSemaphore(0), - } - ht, err := htraceBld.Build() - if err != nil { - panic(err) - } - defer ht.Close() - createSpans(SIMPLE_TEST_SPANS, ht.Store) - assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS)) - // Adding a prev value to this query excludes the first result that we - // would normally get. - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.GREATER_THAN, - Field: common.BEGIN_TIME, - Val: "120", - }, - }, - Lim: 5, - Prev: &SIMPLE_TEST_SPANS[0], - }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]}) - - // There is only one result from an EQUALS query on SPAN_ID. - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.EQUALS, - Field: common.SPAN_ID, - Val: common.TestId("00000000000000000000000000000001").String(), - }, - }, - Lim: 100, - Prev: &SIMPLE_TEST_SPANS[0], - }, []common.Span{}) - - // When doing a LESS_THAN_OR_EQUALS search, we still don't get back the - // span we pass as a continuation token. (Primary index edition). - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.LESS_THAN_OR_EQUALS, - Field: common.SPAN_ID, - Val: common.TestId("00000000000000000000000000000002").String(), - }, - }, - Lim: 100, - Prev: &SIMPLE_TEST_SPANS[1], - }, []common.Span{SIMPLE_TEST_SPANS[0]}) - - // When doing a GREATER_THAN_OR_EQUALS search, we still don't get back the - // span we pass as a continuation token. (Secondary index edition). - testQuery(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.GREATER_THAN, - Field: common.DURATION, - Val: "0", - }, - }, - Lim: 100, - Prev: &SIMPLE_TEST_SPANS[1], - }, []common.Span{SIMPLE_TEST_SPANS[2], SIMPLE_TEST_SPANS[0]}) -} - -func TestQueryRowsScanned(t *testing.T) { - t.Parallel() - htraceBld := &MiniHTracedBuilder{Name: "TestQueryRowsScanned", - WrittenSpans: common.NewSemaphore(0), - } - ht, err := htraceBld.Build() - if err != nil { - panic(err) - } - defer ht.Close() - createSpans(SIMPLE_TEST_SPANS, ht.Store) - assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS)) - testQueryExt(t, ht, &common.Query{ - Predicates: []common.Predicate{ - common.Predicate{ - Op: common.EQUALS, - Field: common.SPAN_ID, - Val: common.TestId("00000000000000000000000000000001").String(), - }, - }, - Lim: 100, - Prev: nil, - }, []common.Span{SIMPLE_TEST_SPANS[0]}, - []int{2, 1}) -}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go deleted file mode 100644 index 49a21ee..0000000 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater.go +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "org/apache/htrace/common" - "sync" - "time" -) - -type Heartbeater struct { - // The name of this heartbeater - name string - - // How long to sleep between heartbeats, in milliseconds. - periodMs int64 - - // The logger to use. - lg *common.Logger - - // The channels to send the heartbeat on. - targets []HeartbeatTarget - - // Incoming requests to the heartbeater. When this is closed, the - // heartbeater will exit. - req chan *HeartbeatTarget - - wg sync.WaitGroup -} - -type HeartbeatTarget struct { - // The name of the heartbeat target. - name string - - // The channel for the heartbeat target. - targetChan chan interface{} -} - -func (tgt *HeartbeatTarget) String() string { - return tgt.name -} - -func NewHeartbeater(name string, periodMs int64, lg *common.Logger) *Heartbeater { - hb := &Heartbeater{ - name: name, - periodMs: periodMs, - lg: lg, - targets: make([]HeartbeatTarget, 0, 4), - req: make(chan *HeartbeatTarget), - } - hb.wg.Add(1) - go hb.run() - return hb -} - -func (hb *Heartbeater) AddHeartbeatTarget(tgt *HeartbeatTarget) { - hb.req <- tgt -} - -func (hb *Heartbeater) Shutdown() { - close(hb.req) - hb.wg.Wait() -} - -func (hb *Heartbeater) String() string { - return hb.name -} - -func (hb *Heartbeater) run() { - defer func() { - hb.lg.Debugf("%s: exiting.\n", hb.String()) - hb.wg.Done() - }() - period := time.Duration(hb.periodMs) * time.Millisecond - for { - periodEnd := time.Now().Add(period) - for { - timeToWait := periodEnd.Sub(time.Now()) - if timeToWait <= 0 { - break - } else if timeToWait > period { - // Smooth over jitter or clock changes - timeToWait = period - periodEnd = time.Now().Add(period) - } - select { - case tgt, open := <-hb.req: - if !open { - return - } - hb.targets = append(hb.targets, *tgt) - hb.lg.Debugf("%s: added %s.\n", hb.String(), tgt.String()) - case <-time.After(timeToWait): - } - } - for targetIdx := range hb.targets { - select { - case hb.targets[targetIdx].targetChan <- nil: - default: - // We failed to send a heartbeat because the other goroutine was busy and - // hasn't cleared the previous one from its channel. This could indicate a - // stuck goroutine. - hb.lg.Infof("%s: could not send heartbeat to %s.\n", - hb.String(), hb.targets[targetIdx]) - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go deleted file mode 100644 index cbde7fc..0000000 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/heartbeater_test.go +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "org/apache/htrace/common" - "org/apache/htrace/conf" - "testing" - "time" -) - -func TestHeartbeaterStartupShutdown(t *testing.T) { - cnfBld := conf.Builder{ - Values: conf.TEST_VALUES(), - Defaults: conf.DEFAULTS, - } - cnf, err := cnfBld.Build() - if err != nil { - t.Fatalf("failed to create conf: %s", err.Error()) - } - lg := common.NewLogger("heartbeater", cnf) - hb := NewHeartbeater("ExampleHeartbeater", 1, lg) - if hb.String() != "ExampleHeartbeater" { - t.Fatalf("hb.String() returned %s instead of %s\n", hb.String(), "ExampleHeartbeater") - } - hb.Shutdown() -} - -// The number of milliseconds between heartbeats -const HEARTBEATER_PERIOD = 5 - -// The number of heartbeats to send in the test. -const NUM_TEST_HEARTBEATS = 3 - -func TestHeartbeaterSendsHeartbeats(t *testing.T) { - cnfBld := conf.Builder{ - Values: conf.TEST_VALUES(), - Defaults: conf.DEFAULTS, - } - cnf, err := cnfBld.Build() - if err != nil { - t.Fatalf("failed to create conf: %s", err.Error()) - } - lg := common.NewLogger("heartbeater", cnf) - // The minimum amount of time which the heartbeater test should take - MINIMUM_TEST_DURATION := time.Millisecond * (NUM_TEST_HEARTBEATS * HEARTBEATER_PERIOD) - duration := MINIMUM_TEST_DURATION - for duration <= MINIMUM_TEST_DURATION { - start := time.Now() - testHeartbeaterSendsHeartbeatsImpl(t, lg) - end := time.Now() - duration = end.Sub(start) - lg.Debugf("Measured duration: %v; minimum expected duration: %v\n", - duration, MINIMUM_TEST_DURATION) - } -} - -func testHeartbeaterSendsHeartbeatsImpl(t *testing.T, lg *common.Logger) { - hb := NewHeartbeater("ExampleHeartbeater", HEARTBEATER_PERIOD, lg) - if hb.String() != "ExampleHeartbeater" { - t.Fatalf("hb.String() returned %s instead of %s\n", hb.String(), "ExampleHeartbeater") - } - testChan := make(chan interface{}, NUM_TEST_HEARTBEATS) - gotAllHeartbeats := make(chan bool) - hb.AddHeartbeatTarget(&HeartbeatTarget{ - name: "ExampleHeartbeatTarget", - targetChan: testChan, - }) - go func() { - for i := 0; i < NUM_TEST_HEARTBEATS; i++ { - <-testChan - } - gotAllHeartbeats <- true - for i := 0; i < NUM_TEST_HEARTBEATS; i++ { - _, open := <-testChan - if !open { - return - } - } - }() - <-gotAllHeartbeats - hb.Shutdown() -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go deleted file mode 100644 index ecd13d4..0000000 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go +++ /dev/null @@ -1,386 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "bufio" - "bytes" - "encoding/binary" - "encoding/json" - "errors" - "fmt" - "github.com/ugorji/go/codec" - "io" - "net" - "net/rpc" - "org/apache/htrace/common" - "org/apache/htrace/conf" - "sync" - "sync/atomic" - "time" -) - -const MAX_HRPC_HANDLERS = 32765 - -// Handles HRPC calls -type HrpcHandler struct { - lg *common.Logger - store *dataStore -} - -// The HRPC server -type HrpcServer struct { - *rpc.Server - hand *HrpcHandler - - // The listener we are using to accept new connections. - listener net.Listener - - // A WaitGroup used to block until the HRPC server has exited. - exited sync.WaitGroup - - // A channel containing server codecs to use. This channel is fully - // buffered. The number of entries it initially contains determines how - // many concurrent codecs we will have running at once. - cdcs chan *HrpcServerCodec - - // Used to shut down - shutdown chan interface{} - - // The I/O timeout to use when reading requests or sending responses. This - // timeout does not apply to the time we spend processing the message. - ioTimeo time.Duration - - // A count of all I/O errors that we have encountered since the server - // started. This counts errors like improperly formatted message frames, - // but not errors like properly formatted but invalid messages. - // This count is updated from multiple goroutines via sync/atomic. - ioErrorCount uint64 - - // The test hooks to use, or nil during normal operation. - testHooks *hrpcTestHooks -} - -type hrpcTestHooks struct { - // A callback we make right after calling Accept() but before reading from - // the new connection. - HandleAdmission func() -} - -// A codec which encodes HRPC data via JSON. This structure holds the context -// for a particular client connection. -type HrpcServerCodec struct { - lg *common.Logger - - // The current connection. - conn net.Conn - - // The HrpcServer which this connection is part of. - hsv *HrpcServer - - // The message length we read from the header. - length uint32 - - // The number of messages this connection has handled. - numHandled int - - // The buffer for reading requests. These buffers are reused for multiple - // requests to avoid allocating memory. - buf []byte - - // Configuration for msgpack decoding - msgpackHandle codec.MsgpackHandle -} - -func asJson(val interface{}) string { - js, err := json.Marshal(val) - if err != nil { - return "encoding error: " + err.Error() - } - return string(js) -} - -func newIoErrorWarn(cdc *HrpcServerCodec, val string) error { - return newIoError(cdc, val, common.WARN) -} - -func newIoError(cdc *HrpcServerCodec, val string, level common.Level) error { - if cdc.lg.LevelEnabled(level) { - cdc.lg.Write(level, cdc.conn.RemoteAddr().String()+": "+val+"\n") - } - if level >= common.INFO { - atomic.AddUint64(&cdc.hsv.ioErrorCount, 1) - } - return errors.New(val) -} - -func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error { - hdr := common.HrpcRequestHeader{} - if cdc.lg.TraceEnabled() { - cdc.lg.Tracef("%s: Reading HRPC request header.\n", cdc.conn.RemoteAddr()) - } - cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo)) - err := binary.Read(cdc.conn, binary.LittleEndian, &hdr) - if err != nil { - if err == io.EOF && cdc.numHandled > 0 { - return newIoError(cdc, fmt.Sprintf("Remote closed connection "+ - "after writing %d message(s)", cdc.numHandled), common.DEBUG) - } - return newIoError(cdc, - fmt.Sprintf("Error reading request header: %s", err.Error()), common.WARN) - } - if cdc.lg.TraceEnabled() { - cdc.lg.Tracef("%s: Read HRPC request header %s\n", - cdc.conn.RemoteAddr(), asJson(&hdr)) - } - if hdr.Magic != common.HRPC_MAGIC { - return newIoErrorWarn(cdc, fmt.Sprintf("Invalid request header: expected "+ - "magic number of 0x%04x, but got 0x%04x", common.HRPC_MAGIC, hdr.Magic)) - } - if hdr.Length > common.MAX_HRPC_BODY_LENGTH { - return newIoErrorWarn(cdc, fmt.Sprintf("Length prefix was too long. "+ - "Maximum length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH, - hdr.Length)) - } - req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId) - if req.ServiceMethod == "" { - return newIoErrorWarn(cdc, fmt.Sprintf("Unknown MethodID code 0x%04x", - hdr.MethodId)) - } - req.Seq = hdr.Seq - cdc.length = hdr.Length - return nil -} - -func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error { - remoteAddr := cdc.conn.RemoteAddr().String() - if cdc.lg.TraceEnabled() { - cdc.lg.Tracef("%s: Reading HRPC %d-byte request body.\n", - remoteAddr, cdc.length) - } - if cap(cdc.buf) < int(cdc.length) { - var pow uint - for pow=0;(1<<pow) < int(cdc.length);pow++ { - } - cdc.buf = make([]byte, 0, 1<<pow) - } - _, err := io.ReadFull(cdc.conn, cdc.buf[:cdc.length]) - if err != nil { - return newIoErrorWarn(cdc, fmt.Sprintf("Failed to read %d-byte "+ - "request body: %s", cdc.length, err.Error())) - } - var zeroTime time.Time - cdc.conn.SetDeadline(zeroTime) - - dec := codec.NewDecoderBytes(cdc.buf[:cdc.length], &cdc.msgpackHandle) - err = dec.Decode(body) - if cdc.lg.TraceEnabled() { - cdc.lg.Tracef("%s: read HRPC message: %s\n", - remoteAddr, asJson(&body)) - } - req := body.(*common.WriteSpansReq) - if req == nil { - return nil - } - // We decode WriteSpans requests in a streaming fashion, to avoid overloading the garbage - // collector with a ton of trace spans all at once. - startTime := time.Now() - client, _, err := net.SplitHostPort(remoteAddr) - if err != nil { - return newIoErrorWarn(cdc, fmt.Sprintf("Failed to split host and port "+ - "for %s: %s\n", remoteAddr, err.Error())) - } - hand := cdc.hsv.hand - ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid) - for spanIdx := 0; spanIdx < req.NumSpans; spanIdx++ { - var span *common.Span - err := dec.Decode(&span) - if err != nil { - return newIoErrorWarn(cdc, fmt.Sprintf("Failed to decode span %d " + - "out of %d: %s\n", spanIdx, req.NumSpans, err.Error())) - } - ing.IngestSpan(span) - } - ing.Close(startTime) - return nil -} - -var EMPTY []byte = make([]byte, 0) - -func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) error { - cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo)) - var err error - buf := EMPTY - if msg != nil { - w := bytes.NewBuffer(make([]byte, 0, 128)) - enc := codec.NewEncoder(w, &cdc.msgpackHandle) - err := enc.Encode(msg) - if err != nil { - return newIoErrorWarn(cdc, fmt.Sprintf("Failed to marshal "+ - "response message: %s", err.Error())) - } - buf = w.Bytes() - } - hdr := common.HrpcResponseHeader{} - hdr.MethodId = common.HrpcMethodNameToId(resp.ServiceMethod) - hdr.Seq = resp.Seq - hdr.ErrLength = uint32(len(resp.Error)) - hdr.Length = uint32(len(buf)) - writer := bufio.NewWriterSize(cdc.conn, 256) - err = binary.Write(writer, binary.LittleEndian, &hdr) - if err != nil { - return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+ - "header: %s", err.Error())) - } - if hdr.ErrLength > 0 { - _, err = io.WriteString(writer, resp.Error) - if err != nil { - return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write error "+ - "string: %s", err.Error())) - } - } - if hdr.Length > 0 { - var length int - length, err = writer.Write(buf) - if err != nil { - return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+ - "message: %s", err.Error())) - } - if uint32(length) != hdr.Length { - return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write all of "+ - "response message: %s", err.Error())) - } - } - err = writer.Flush() - if err != nil { - return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write the response "+ - "bytes: %s", err.Error())) - } - cdc.numHandled++ - return nil -} - -func (cdc *HrpcServerCodec) Close() error { - err := cdc.conn.Close() - cdc.conn = nil - cdc.length = 0 - cdc.numHandled = 0 - cdc.hsv.cdcs <- cdc - return err -} - -func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq, - resp *common.WriteSpansResp) (err error) { - // Nothing to do here; WriteSpans is handled in ReadRequestBody. - return nil -} - -func CreateHrpcServer(cnf *conf.Config, store *dataStore, - testHooks *hrpcTestHooks) (*HrpcServer, error) { - lg := common.NewLogger("hrpc", cnf) - numHandlers := cnf.GetInt(conf.HTRACE_NUM_HRPC_HANDLERS) - if numHandlers < 1 { - lg.Warnf("%s must be positive: using 1 handler.\n", conf.HTRACE_NUM_HRPC_HANDLERS) - numHandlers = 1 - } - if numHandlers > MAX_HRPC_HANDLERS { - lg.Warnf("%s cannot be more than %d: using %d handlers\n", - conf.HTRACE_NUM_HRPC_HANDLERS, MAX_HRPC_HANDLERS, MAX_HRPC_HANDLERS) - numHandlers = MAX_HRPC_HANDLERS - } - hsv := &HrpcServer{ - Server: rpc.NewServer(), - hand: &HrpcHandler{ - lg: lg, - store: store, - }, - cdcs: make(chan *HrpcServerCodec, numHandlers), - shutdown: make(chan interface{}), - ioTimeo: time.Millisecond * - time.Duration(cnf.GetInt64(conf.HTRACE_HRPC_IO_TIMEOUT_MS)), - testHooks: testHooks, - } - for i := 0; i < numHandlers; i++ { - hsv.cdcs <- &HrpcServerCodec{ - lg: lg, - hsv: hsv, - msgpackHandle: codec.MsgpackHandle { - WriteExt: true, - }, - } - } - var err error - hsv.listener, err = net.Listen("tcp", cnf.Get(conf.HTRACE_HRPC_ADDRESS)) - if err != nil { - return nil, err - } - hsv.Server.Register(hsv.hand) - hsv.exited.Add(1) - go hsv.run() - lg.Infof("Started HRPC server on %s with %d handler routines. "+ - "ioTimeo=%s.\n", hsv.listener.Addr().String(), numHandlers, - hsv.ioTimeo.String()) - return hsv, nil -} - -func (hsv *HrpcServer) run() { - lg := hsv.hand.lg - srvAddr := hsv.listener.Addr().String() - defer func() { - lg.Infof("HrpcServer on %s exiting\n", srvAddr) - hsv.exited.Done() - }() - for { - select { - case cdc := <-hsv.cdcs: - conn, err := hsv.listener.Accept() - if err != nil { - lg.Errorf("HrpcServer on %s got accept error: %s\n", srvAddr, err.Error()) - hsv.cdcs <- cdc // never blocks; there is always sufficient buffer space - continue - } - if lg.TraceEnabled() { - lg.Tracef("%s: Accepted HRPC connection.\n", conn.RemoteAddr()) - } - cdc.conn = conn - cdc.numHandled = 0 - if hsv.testHooks != nil && hsv.testHooks.HandleAdmission != nil { - hsv.testHooks.HandleAdmission() - } - go hsv.ServeCodec(cdc) - case <-hsv.shutdown: - return - } - } -} - -func (hsv *HrpcServer) Addr() net.Addr { - return hsv.listener.Addr() -} - -func (hsv *HrpcServer) GetNumIoErrors() uint64 { - return atomic.LoadUint64(&hsv.ioErrorCount) -} - -func (hsv *HrpcServer) Close() { - close(hsv.shutdown) - hsv.listener.Close() - hsv.exited.Wait() -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go deleted file mode 100644 index 35ee753..0000000 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "bufio" - "encoding/json" - "fmt" - "github.com/alecthomas/kingpin" - "github.com/jmhodges/levigo" - "net" - "org/apache/htrace/common" - "org/apache/htrace/conf" - "os" - "runtime" - "time" -) - -var RELEASE_VERSION string -var GIT_VERSION string - -const USAGE = `htraced: the HTrace server daemon. - -htraced receives trace spans sent from HTrace clients. It exposes a REST -interface which others can query. It also runs a web server with a graphical -user interface. htraced stores its span data in levelDB files on the local -disks. - -Usage: ---help: this help message - --Dk=v: set configuration key 'k' to value 'v' -For example -Dweb.address=127.0.0.1:8080 sets the web address to localhost, -port 8080. -Dlog.level=DEBUG will set the default log level to DEBUG. - --Dk: set configuration key 'k' to 'true' - -Normally, configuration options should be set in the ` + conf.CONFIG_FILE_NAME + ` -configuration file. We find this file by searching the paths in the -` + conf.HTRACED_CONF_DIR + `. The command-line options are just an alternate way -of setting configuration when launching the daemon. -` - -func main() { - // Load the htraced configuration. - // This also parses the -Dfoo=bar command line arguments and removes them - // from os.Argv. - cnf, cnfLog := conf.LoadApplicationConfig("htraced.") - - // Parse the remaining command-line arguments. - app := kingpin.New(os.Args[0], USAGE) - version := app.Command("version", "Print server version and exit.") - cmd := kingpin.MustParse(app.Parse(os.Args[1:])) - - // Handle the "version" command-line argument. - if cmd == version.FullCommand() { - fmt.Printf("Running htraced %s [%s].\n", RELEASE_VERSION, GIT_VERSION) - os.Exit(0) - } - - // Open the HTTP port. - // We want to do this first, before initializing the datastore or setting up - // logging. That way, if someone accidentally starts two daemons with the - // same config file, the second invocation will exit with a "port in use" - // error rather than potentially disrupting the first invocation. - rstListener, listenErr := net.Listen("tcp", cnf.Get(conf.HTRACE_WEB_ADDRESS)) - if listenErr != nil { - fmt.Fprintf(os.Stderr, "Error opening HTTP port: %s\n", - listenErr.Error()) - os.Exit(1) - } - - // Print out the startup banner and information about the daemon - // configuration. - lg := common.NewLogger("main", cnf) - defer lg.Close() - lg.Infof("*** Starting htraced %s [%s]***\n", RELEASE_VERSION, GIT_VERSION) - scanner := bufio.NewScanner(cnfLog) - for scanner.Scan() { - lg.Infof(scanner.Text() + "\n") - } - common.InstallSignalHandlers(cnf) - if runtime.GOMAXPROCS(0) == 1 { - ncpu := runtime.NumCPU() - runtime.GOMAXPROCS(ncpu) - lg.Infof("setting GOMAXPROCS=%d\n", ncpu) - } else { - lg.Infof("GOMAXPROCS=%d\n", runtime.GOMAXPROCS(0)) - } - lg.Infof("leveldb version=%d.%d\n", - levigo.GetLevelDBMajorVersion(), levigo.GetLevelDBMinorVersion()) - - // Initialize the datastore. - store, err := CreateDataStore(cnf, nil) - if err != nil { - lg.Errorf("Error creating datastore: %s\n", err.Error()) - os.Exit(1) - } - var rsv *RestServer - rsv, err = CreateRestServer(cnf, store, rstListener) - if err != nil { - lg.Errorf("Error creating REST server: %s\n", err.Error()) - os.Exit(1) - } - var hsv *HrpcServer - if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" { - hsv, err = CreateHrpcServer(cnf, store, nil) - if err != nil { - lg.Errorf("Error creating HRPC server: %s\n", err.Error()) - os.Exit(1) - } - } else { - lg.Infof("Not starting HRPC server because no value was given for %s.\n", - conf.HTRACE_HRPC_ADDRESS) - } - naddr := cnf.Get(conf.HTRACE_STARTUP_NOTIFICATION_ADDRESS) - if naddr != "" { - notif := StartupNotification{ - HttpAddr: rsv.Addr().String(), - ProcessId: os.Getpid(), - } - if hsv != nil { - notif.HrpcAddr = hsv.Addr().String() - } - err = sendStartupNotification(naddr, ¬if) - if err != nil { - fmt.Fprintf(os.Stderr, "Failed to send startup notification: "+ - "%s\n", err.Error()) - os.Exit(1) - } - } - for { - time.Sleep(time.Duration(10) * time.Hour) - } -} - -// A startup notification message that we optionally send on startup. -// Used by unit tests. -type StartupNotification struct { - HttpAddr string - HrpcAddr string - ProcessId int -} - -func sendStartupNotification(naddr string, notif *StartupNotification) error { - conn, err := net.Dial("tcp", naddr) - if err != nil { - return err - } - defer func() { - if conn != nil { - conn.Close() - } - }() - var buf []byte - buf, err = json.Marshal(notif) - if err != nil { - return err - } - _, err = conn.Write(buf) - conn.Close() - conn = nil - return nil -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/loader.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/loader.go b/htrace-htraced/go/src/org/apache/htrace/htraced/loader.go deleted file mode 100644 index 5914004..0000000 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/loader.go +++ /dev/null @@ -1,511 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "bytes" - "errors" - "fmt" - "github.com/jmhodges/levigo" - "github.com/ugorji/go/codec" - "io" - "math" - "math/rand" - "org/apache/htrace/common" - "org/apache/htrace/conf" - "os" - "strings" - "syscall" - "time" -) - -// Routines for loading the datastore. - -// The leveldb key which has information about the shard. -const SHARD_INFO_KEY = 'w' - -// A constant signifying that we don't know what the layout version is. -const UNKNOWN_LAYOUT_VERSION = 0 - -// The current layout version. We cannot read layout versions newer than this. -// We may sometimes be able to read older versions, but only by doing an -// upgrade. -const CURRENT_LAYOUT_VERSION = 3 - -type DataStoreLoader struct { - // The dataStore logger. - lg *common.Logger - - // True if we should clear the stored data. - ClearStored bool - - // The shards that we're loading - shards []*ShardLoader - - // The options to use for opening datastores in LevelDB. - openOpts *levigo.Options - - // The read options to use for LevelDB. - readOpts *levigo.ReadOptions - - // The write options to use for LevelDB. - writeOpts *levigo.WriteOptions -} - -// Information about a Shard. -type ShardInfo struct { - // The layout version of the datastore. - // We should always keep this field so that old software can recognize new - // layout versions, even if it can't read them. - LayoutVersion uint64 - - // A random number identifying this daemon. - DaemonId uint64 - - // The total number of shards in this datastore. - TotalShards uint32 - - // The index of this shard within the datastore. - ShardIndex uint32 -} - -// Create a new datastore loader. -// Initializes the loader, but does not load any leveldb instances. -func NewDataStoreLoader(cnf *conf.Config) *DataStoreLoader { - dld := &DataStoreLoader{ - lg: common.NewLogger("datastore", cnf), - ClearStored: cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR), - } - dld.readOpts = levigo.NewReadOptions() - dld.readOpts.SetFillCache(true) - dld.readOpts.SetVerifyChecksums(false) - dld.writeOpts = levigo.NewWriteOptions() - dld.writeOpts.SetSync(false) - dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES) - rdirs := strings.Split(dirsStr, conf.PATH_LIST_SEP) - // Filter out empty entries - dirs := make([]string, 0, len(rdirs)) - for i := range(rdirs) { - if strings.TrimSpace(rdirs[i]) != "" { - dirs = append(dirs, rdirs[i]) - } - } - dld.shards = make([]*ShardLoader, len(dirs)) - for i := range(dirs) { - dld.shards[i] = &ShardLoader{ - dld: dld, - path: dirs[i] + conf.PATH_SEP + "db", - } - } - dld.openOpts = levigo.NewOptions() - cacheSize := cnf.GetInt(conf.HTRACE_LEVELDB_CACHE_SIZE) - dld.openOpts.SetCache(levigo.NewLRUCache(cacheSize)) - dld.openOpts.SetParanoidChecks(false) - writeBufferSize := cnf.GetInt(conf.HTRACE_LEVELDB_WRITE_BUFFER_SIZE) - if writeBufferSize > 0 { - dld.openOpts.SetWriteBufferSize(writeBufferSize) - } - maxFdPerShard := dld.calculateMaxOpenFilesPerShard() - if maxFdPerShard > 0 { - dld.openOpts.SetMaxOpenFiles(maxFdPerShard) - } - return dld -} - -func (dld *DataStoreLoader) Close() { - if dld.lg != nil { - dld.lg.Close() - dld.lg = nil - } - if dld.openOpts != nil { - dld.openOpts.Close() - dld.openOpts = nil - } - if dld.readOpts != nil { - dld.readOpts.Close() - dld.readOpts = nil - } - if dld.writeOpts != nil { - dld.writeOpts.Close() - dld.writeOpts = nil - } - if dld.shards != nil { - for i := range(dld.shards) { - if dld.shards[i] != nil { - dld.shards[i].Close() - } - } - dld.shards = nil - } -} - -func (dld *DataStoreLoader) DisownResources() { - dld.lg = nil - dld.openOpts = nil - dld.readOpts = nil - dld.writeOpts = nil - dld.shards = nil -} - -// The maximum number of file descriptors we'll use on non-datastore things. -const NON_DATASTORE_FD_MAX = 300 - -// The minimum number of file descriptors per shard we will set. Setting fewer -// than this number could trigger a bug in some early versions of leveldb. -const MIN_FDS_PER_SHARD = 80 - -func (dld *DataStoreLoader) calculateMaxOpenFilesPerShard() int { - var rlim syscall.Rlimit - err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlim) - if err != nil { - dld.lg.Warnf("Unable to calculate maximum open files per shard: " + - "getrlimit failed: %s\n", err.Error()) - return 0 - } - // I think RLIMIT_NOFILE fits in 32 bits on all known operating systems, - // but there's no harm in being careful. 'int' in golang always holds at - // least 32 bits. - var maxFd int - if rlim.Cur > uint64(math.MaxInt32) { - maxFd = math.MaxInt32 - } else { - maxFd = int(rlim.Cur) - } - if len(dld.shards) == 0 { - dld.lg.Warnf("Unable to calculate maximum open files per shard, " + - "since there are 0 shards configured.\n") - return 0 - } - fdsPerShard := (maxFd - NON_DATASTORE_FD_MAX) / len(dld.shards) - if fdsPerShard < MIN_FDS_PER_SHARD { - dld.lg.Warnf("Expected to be able to use at least %d " + - "fds per shard, but we have %d shards and %d total fds to allocate, " + - "giving us only %d FDs per shard.", MIN_FDS_PER_SHARD, - len(dld.shards), maxFd - NON_DATASTORE_FD_MAX, fdsPerShard) - return 0 - } - dld.lg.Infof("maxFd = %d. Setting maxFdPerShard = %d\n", - maxFd, fdsPerShard) - return fdsPerShard -} - -// Load information about all shards. -func (dld *DataStoreLoader) LoadShards() { - for i := range(dld.shards) { - shd := dld.shards[i] - shd.load() - } -} - -// Verify that the shard infos are consistent. -// Reorders the shardInfo structures based on their ShardIndex. -func (dld *DataStoreLoader) VerifyShardInfos() error { - if len(dld.shards) < 1 { - return errors.New("No shard directories found.") - } - // Make sure no shards had errors. - for i := range(dld.shards) { - shd := dld.shards[i] - if shd.infoErr != nil { - return shd.infoErr - } - } - // Make sure that if any shards are empty, all shards are empty. - emptyShards := "" - prefix := "" - for i := range(dld.shards) { - if dld.shards[i].info == nil { - emptyShards = prefix + dld.shards[i].path - prefix = ", " - } - } - if emptyShards != "" { - for i := range(dld.shards) { - if dld.shards[i].info != nil { - return errors.New(fmt.Sprintf("Shards %s were empty, but " + - "the other shards had data.", emptyShards)) - } - } - // All shards are empty. - return nil - } - // Make sure that all shards have the same layout version, daemonId, and number of total - // shards. - layoutVersion := dld.shards[0].info.LayoutVersion - daemonId := dld.shards[0].info.DaemonId - totalShards := dld.shards[0].info.TotalShards - for i := 1; i < len(dld.shards); i++ { - shd := dld.shards[i] - if layoutVersion != shd.info.LayoutVersion { - return errors.New(fmt.Sprintf("Layout version mismatch. Shard " + - "%s has layout version 0x%016x, but shard %s has layout " + - "version 0x%016x.", - dld.shards[0].path, layoutVersion, shd.path, shd.info.LayoutVersion)) - } - if daemonId != shd.info.DaemonId { - return errors.New(fmt.Sprintf("DaemonId mismatch. Shard %s has " + - "daemonId 0x%016x, but shard %s has daemonId 0x%016x.", - dld.shards[0].path, daemonId, shd.path, shd.info.DaemonId)) - } - if totalShards != shd.info.TotalShards { - return errors.New(fmt.Sprintf("TotalShards mismatch. Shard %s has " + - "TotalShards = %d, but shard %s has TotalShards = %d.", - dld.shards[0].path, totalShards, shd.path, shd.info.TotalShards)) - } - if shd.info.ShardIndex >= totalShards { - return errors.New(fmt.Sprintf("Invalid ShardIndex. Shard %s has " + - "ShardIndex = %d, but TotalShards = %d.", - shd.path, shd.info.ShardIndex, shd.info.TotalShards)) - } - } - if layoutVersion != CURRENT_LAYOUT_VERSION { - return errors.New(fmt.Sprintf("The layout version of all shards " + - "is %d, but we only support version %d.", - layoutVersion, CURRENT_LAYOUT_VERSION)) - } - if totalShards != uint32(len(dld.shards)) { - return errors.New(fmt.Sprintf("The TotalShards field of all shards " + - "is %d, but we have %d shards.", totalShards, len(dld.shards))) - } - // Reorder shards in order of their ShardIndex. - reorderedShards := make([]*ShardLoader, len(dld.shards)) - for i := 0; i < len(dld.shards); i++ { - shd := dld.shards[i] - shardIdx := shd.info.ShardIndex - if reorderedShards[shardIdx] != nil { - return errors.New(fmt.Sprintf("Both shard %s and " + - "shard %s have ShardIndex %d.", shd.path, - reorderedShards[shardIdx].path, shardIdx)) - } - reorderedShards[shardIdx] = shd - } - dld.shards = reorderedShards - return nil -} - -func (dld *DataStoreLoader) Load() error { - var err error - // If data.store.clear was set, clear existing data. - if dld.ClearStored { - err = dld.clearStored() - if err != nil { - return err - } - } - // Make sure the shard directories exist in all cases, with a mkdir -p - for i := range dld.shards { - err := os.MkdirAll(dld.shards[i].path, 0777) - if err != nil { - return errors.New(fmt.Sprintf("Failed to MkdirAll(%s): %s", - dld.shards[i].path, err.Error())) - } - } - // Get information about each shard, and verify them. - dld.LoadShards() - err = dld.VerifyShardInfos() - if err != nil { - return err - } - if dld.shards[0].ldb != nil { - dld.lg.Infof("Loaded %d leveldb instances with " + - "DaemonId of 0x%016x\n", len(dld.shards), - dld.shards[0].info.DaemonId) - } else { - // Create leveldb instances if needed. - rnd := rand.New(rand.NewSource(time.Now().UnixNano())) - daemonId := uint64(rnd.Int63()) - dld.lg.Infof("Initializing %d leveldb instances with a new " + - "DaemonId of 0x%016x\n", len(dld.shards), daemonId) - dld.openOpts.SetCreateIfMissing(true) - for i := range(dld.shards) { - shd := dld.shards[i] - shd.ldb, err = levigo.Open(shd.path, shd.dld.openOpts) - if err != nil { - return errors.New(fmt.Sprintf("levigo.Open(%s) failed to " + - "create the shard: %s", shd.path, err.Error())) - } - info := &ShardInfo { - LayoutVersion: CURRENT_LAYOUT_VERSION, - DaemonId: daemonId, - TotalShards: uint32(len(dld.shards)), - ShardIndex: uint32(i), - } - err = shd.writeShardInfo(info) - if err != nil { - return errors.New(fmt.Sprintf("levigo.Open(%s) failed to " + - "write shard info: %s", shd.path, err.Error())) - } - dld.lg.Infof("Shard %s initialized with ShardInfo %s \n", - shd.path, asJson(info)) - } - } - return nil -} - -func (dld *DataStoreLoader) clearStored() error { - for i := range dld.shards { - path := dld.shards[i].path - fi, err := os.Stat(path) - if err != nil && !os.IsNotExist(err) { - dld.lg.Errorf("Failed to stat %s: %s\n", path, err.Error()) - return err - } - if fi != nil { - err = os.RemoveAll(path) - if err != nil { - dld.lg.Errorf("Failed to clear existing datastore directory %s: %s\n", - path, err.Error()) - return err - } - dld.lg.Infof("Cleared existing datastore directory %s\n", path) - } - } - return nil -} - -type ShardLoader struct { - // The parent DataStoreLoader - dld *DataStoreLoader - - // Path to the shard - path string - - // Leveldb instance of the shard - ldb *levigo.DB - - // Information about the shard - info *ShardInfo - - // If non-null, the error we encountered trying to load the shard info. - infoErr error -} - -func (shd *ShardLoader) Close() { - if shd.ldb != nil { - shd.ldb.Close() - shd.ldb = nil - } -} - -// Load information about a particular shard. -func (shd *ShardLoader) load() { - shd.info = nil - fi, err := os.Stat(shd.path) - if err != nil { - if os.IsNotExist(err) { - shd.infoErr = nil - return - } - shd.infoErr = errors.New(fmt.Sprintf( - "stat() error on leveldb directory " + - "%s: %s", shd.path, err.Error())) - return - } - if !fi.Mode().IsDir() { - shd.infoErr = errors.New(fmt.Sprintf( - "stat() error on leveldb directory " + - "%s: inode is not directory.", shd.path)) - return - } - var dbDir *os.File - dbDir, err = os.Open(shd.path) - if err != nil { - shd.infoErr = errors.New(fmt.Sprintf( - "open() error on leveldb directory " + - "%s: %s.", shd.path, err.Error())) - return - } - defer func() { - if dbDir != nil { - dbDir.Close() - } - }() - _, err = dbDir.Readdirnames(1) - if err != nil { - if err == io.EOF { - // The db directory is empty. - shd.infoErr = nil - return - } - shd.infoErr = errors.New(fmt.Sprintf( - "Readdirnames() error on leveldb directory " + - "%s: %s.", shd.path, err.Error())) - return - } - dbDir.Close() - dbDir = nil - shd.ldb, err = levigo.Open(shd.path, shd.dld.openOpts) - if err != nil { - shd.ldb = nil - shd.infoErr = errors.New(fmt.Sprintf( - "levigo.Open() error on leveldb directory " + - "%s: %s.", shd.path, err.Error())) - return - } - shd.info, err = shd.readShardInfo() - if err != nil { - shd.infoErr = err - return - } - shd.infoErr = nil -} - -func (shd *ShardLoader) readShardInfo() (*ShardInfo, error) { - buf, err := shd.ldb.Get(shd.dld.readOpts, []byte{SHARD_INFO_KEY}) - if err != nil { - return nil, errors.New(fmt.Sprintf("readShardInfo(%s): failed to " + - "read shard info key: %s", shd.path, err.Error())) - } - if len(buf) == 0 { - return nil, errors.New(fmt.Sprintf("readShardInfo(%s): got zero-" + - "length value for shard info key.", shd.path)) - } - mh := new(codec.MsgpackHandle) - mh.WriteExt = true - r := bytes.NewBuffer(buf) - decoder := codec.NewDecoder(r, mh) - shardInfo := &ShardInfo { - LayoutVersion: UNKNOWN_LAYOUT_VERSION, - } - err = decoder.Decode(shardInfo) - if err != nil { - return nil, errors.New(fmt.Sprintf("readShardInfo(%s): msgpack " + - "decoding failed for shard info key: %s", shd.path, err.Error())) - } - return shardInfo, nil -} - -func (shd *ShardLoader) writeShardInfo(info *ShardInfo) error { - mh := new(codec.MsgpackHandle) - mh.WriteExt = true - w := new(bytes.Buffer) - enc := codec.NewEncoder(w, mh) - err := enc.Encode(info) - if err != nil { - return errors.New(fmt.Sprintf("msgpack encoding error: %s", - err.Error())) - } - err = shd.ldb.Put(shd.dld.writeOpts, []byte{SHARD_INFO_KEY}, w.Bytes()) - if err != nil { - return errors.New(fmt.Sprintf("leveldb write error: %s", - err.Error())) - } - return nil -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go deleted file mode 100644 index 9176de0..0000000 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics.go +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "math" - "org/apache/htrace/common" - "org/apache/htrace/conf" - "sync" - "time" -) - -// -// The Metrics Sink for HTraced. -// -// The Metrics sink keeps track of metrics for the htraced daemon. -// It is important to have good metrics so that we can properly manager htraced. In particular, we -// need to know what rate we are receiving spans at, the main places spans came from. If spans -// were dropped because of a high sampling rates, we need to know which part of the system dropped -// them so that we can adjust the sampling rate there. -// - -const LATENCY_CIRC_BUF_SIZE = 4096 - -type MetricsSink struct { - // The metrics sink logger. - lg *common.Logger - - // The maximum number of entries we shuld allow in the HostSpanMetrics map. - maxMtx int - - // The total number of spans ingested by the server (counting dropped spans) - IngestedSpans uint64 - - // The total number of spans written to leveldb since the server started. - WrittenSpans uint64 - - // The total number of spans dropped by the server. - ServerDropped uint64 - - // Per-host Span Metrics - HostSpanMetrics common.SpanMetricsMap - - // The last few writeSpan latencies - wsLatencyCircBuf *CircBufU32 - - // Lock protecting all metrics - lock sync.Mutex -} - -func NewMetricsSink(cnf *conf.Config) *MetricsSink { - return &MetricsSink{ - lg: common.NewLogger("metrics", cnf), - maxMtx: cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES), - HostSpanMetrics: make(common.SpanMetricsMap), - wsLatencyCircBuf: NewCircBufU32(LATENCY_CIRC_BUF_SIZE), - } -} - -// Update the total number of spans which were ingested, as well as other -// metrics that get updated during span ingest. -func (msink *MetricsSink) UpdateIngested(addr string, totalIngested int, - serverDropped int, wsLatency time.Duration) { - msink.lock.Lock() - defer msink.lock.Unlock() - msink.IngestedSpans += uint64(totalIngested) - msink.ServerDropped += uint64(serverDropped) - msink.updateSpanMetrics(addr, 0, serverDropped) - wsLatencyMs := wsLatency.Nanoseconds() / 1000000 - var wsLatency32 uint32 - if wsLatencyMs > math.MaxUint32 { - wsLatency32 = math.MaxUint32 - } else { - wsLatency32 = uint32(wsLatencyMs) - } - msink.wsLatencyCircBuf.Append(wsLatency32) -} - -// Update the per-host span metrics. Must be called with the lock held. -func (msink *MetricsSink) updateSpanMetrics(addr string, numWritten int, - serverDropped int) { - mtx, found := msink.HostSpanMetrics[addr] - if !found { - // Ensure that the per-host span metrics map doesn't grow too large. - if len(msink.HostSpanMetrics) >= msink.maxMtx { - // Delete a random entry - for k := range msink.HostSpanMetrics { - msink.lg.Warnf("Evicting metrics entry for addr %s "+ - "because there are more than %d addrs.\n", k, msink.maxMtx) - delete(msink.HostSpanMetrics, k) - break - } - } - mtx = &common.SpanMetrics{} - msink.HostSpanMetrics[addr] = mtx - } - mtx.Written += uint64(numWritten) - mtx.ServerDropped += uint64(serverDropped) -} - -// Update the total number of spans which were persisted to disk. -func (msink *MetricsSink) UpdatePersisted(addr string, totalWritten int, - serverDropped int) { - msink.lock.Lock() - defer msink.lock.Unlock() - msink.WrittenSpans += uint64(totalWritten) - msink.ServerDropped += uint64(serverDropped) - msink.updateSpanMetrics(addr, totalWritten, serverDropped) -} - -// Read the server stats. -func (msink *MetricsSink) PopulateServerStats(stats *common.ServerStats) { - msink.lock.Lock() - defer msink.lock.Unlock() - stats.IngestedSpans = msink.IngestedSpans - stats.WrittenSpans = msink.WrittenSpans - stats.ServerDroppedSpans = msink.ServerDropped - stats.MaxWriteSpansLatencyMs = msink.wsLatencyCircBuf.Max() - stats.AverageWriteSpansLatencyMs = msink.wsLatencyCircBuf.Average() - stats.HostSpanMetrics = make(common.SpanMetricsMap) - for k, v := range msink.HostSpanMetrics { - stats.HostSpanMetrics[k] = &common.SpanMetrics{ - Written: v.Written, - ServerDropped: v.ServerDropped, - } - } -} - -// A circular buffer of uint32s which supports appending and taking the -// average, and some other things. -type CircBufU32 struct { - // The next slot to fill - slot int - - // The number of slots which are in use. This number only ever - // increases until the buffer is full. - slotsUsed int - - // The buffer - buf []uint32 -} - -func NewCircBufU32(size int) *CircBufU32 { - return &CircBufU32{ - slotsUsed: -1, - buf: make([]uint32, size), - } -} - -func (cbuf *CircBufU32) Max() uint32 { - var max uint32 - for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ { - if cbuf.buf[bufIdx] > max { - max = cbuf.buf[bufIdx] - } - } - return max -} - -func (cbuf *CircBufU32) Average() uint32 { - var total uint64 - for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ { - total += uint64(cbuf.buf[bufIdx]) - } - return uint32(total / uint64(cbuf.slotsUsed)) -} - -func (cbuf *CircBufU32) Append(val uint32) { - cbuf.buf[cbuf.slot] = val - cbuf.slot++ - if cbuf.slotsUsed < cbuf.slot { - cbuf.slotsUsed = cbuf.slot - } - if cbuf.slot >= len(cbuf.buf) { - cbuf.slot = 0 - } -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go deleted file mode 100644 index 6daf640..0000000 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "fmt" - htrace "org/apache/htrace/client" - "org/apache/htrace/common" - "org/apache/htrace/conf" - "reflect" - "testing" - "time" -) - -func compareTotals(a, b common.SpanMetricsMap) bool { - for k, v := range a { - if !reflect.DeepEqual(v, b[k]) { - return false - } - } - for k, v := range b { - if !reflect.DeepEqual(v, a[k]) { - return false - } - } - return true -} - -type Fatalfer interface { - Fatalf(format string, args ...interface{}) -} - -func assertNumWrittenEquals(t Fatalfer, msink *MetricsSink, - expectedNumWritten int) { - var sstats common.ServerStats - msink.PopulateServerStats(&sstats) - if sstats.WrittenSpans != uint64(expectedNumWritten) { - t.Fatalf("sstats.WrittenSpans = %d, but expected %d\n", - sstats.WrittenSpans, len(SIMPLE_TEST_SPANS)) - } - if sstats.HostSpanMetrics["127.0.0.1"] == nil { - t.Fatalf("no entry for sstats.HostSpanMetrics[127.0.0.1] found.") - } - if sstats.HostSpanMetrics["127.0.0.1"].Written != - uint64(expectedNumWritten) { - t.Fatalf("sstats.HostSpanMetrics[127.0.0.1].Written = %d, but "+ - "expected %d\n", sstats.HostSpanMetrics["127.0.0.1"].Written, - len(SIMPLE_TEST_SPANS)) - } -} - -func TestMetricsSinkPerHostEviction(t *testing.T) { - cnfBld := conf.Builder{ - Values: conf.TEST_VALUES(), - Defaults: conf.DEFAULTS, - } - cnfBld.Values[conf.HTRACE_METRICS_MAX_ADDR_ENTRIES] = "2" - cnf, err := cnfBld.Build() - if err != nil { - t.Fatalf("failed to create conf: %s", err.Error()) - } - msink := NewMetricsSink(cnf) - msink.UpdatePersisted("192.168.0.100", 20, 10) - msink.UpdatePersisted("192.168.0.101", 20, 10) - msink.UpdatePersisted("192.168.0.102", 20, 10) - msink.lock.Lock() - defer msink.lock.Unlock() - if len(msink.HostSpanMetrics) != 2 { - for k, v := range msink.HostSpanMetrics { - fmt.Printf("WATERMELON: [%s] = [%s]\n", k, v) - } - t.Fatalf("Expected len(msink.HostSpanMetrics) to be 2, but got %d\n", - len(msink.HostSpanMetrics)) - } -} - -func TestIngestedSpansMetricsRest(t *testing.T) { - testIngestedSpansMetricsImpl(t, false) -} - -func TestIngestedSpansMetricsPacked(t *testing.T) { - testIngestedSpansMetricsImpl(t, true) -} - -func testIngestedSpansMetricsImpl(t *testing.T, usePacked bool) { - htraceBld := &MiniHTracedBuilder{Name: "TestIngestedSpansMetrics", - DataDirs: make([]string, 2), - } - ht, err := htraceBld.Build() - if err != nil { - t.Fatalf("failed to create datastore: %s", err.Error()) - } - defer ht.Close() - var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf(), &htrace.TestHooks{ - HrpcDisabled: !usePacked, - }) - if err != nil { - t.Fatalf("failed to create client: %s", err.Error()) - } - - NUM_TEST_SPANS := 12 - allSpans := createRandomTestSpans(NUM_TEST_SPANS) - err = hcl.WriteSpans(allSpans) - if err != nil { - t.Fatalf("WriteSpans failed: %s\n", err.Error()) - } - for { - var stats *common.ServerStats - stats, err = hcl.GetServerStats() - if err != nil { - t.Fatalf("GetServerStats failed: %s\n", err.Error()) - } - if stats.IngestedSpans == uint64(NUM_TEST_SPANS) { - break - } - time.Sleep(1 * time.Millisecond) - } -} - -func TestCircBuf32(t *testing.T) { - cbuf := NewCircBufU32(3) - // We arbitrarily define that empty circular buffers have an average of 0. - if cbuf.Average() != 0 { - t.Fatalf("expected empty CircBufU32 to have an average of 0.\n") - } - if cbuf.Max() != 0 { - t.Fatalf("expected empty CircBufU32 to have a max of 0.\n") - } - cbuf.Append(2) - if cbuf.Average() != 2 { - t.Fatalf("expected one-element CircBufU32 to have an average of 2.\n") - } - cbuf.Append(10) - if cbuf.Average() != 6 { - t.Fatalf("expected two-element CircBufU32 to have an average of 6.\n") - } - cbuf.Append(12) - if cbuf.Average() != 8 { - t.Fatalf("expected three-element CircBufU32 to have an average of 8.\n") - } - cbuf.Append(14) - // The 14 overwrites the original 2 element. - if cbuf.Average() != 12 { - t.Fatalf("expected three-element CircBufU32 to have an average of 12.\n") - } - cbuf.Append(1) - // The 1 overwrites the original 10 element. - if cbuf.Average() != 9 { - t.Fatalf("expected three-element CircBufU32 to have an average of 12.\n") - } - if cbuf.Max() != 14 { - t.Fatalf("expected three-element CircBufU32 to have a max of 14.\n") - } -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go deleted file mode 100644 index cf7ef67..0000000 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "fmt" - "io/ioutil" - "net" - "org/apache/htrace/common" - "org/apache/htrace/conf" - "os" - "strings" -) - -// -// MiniHTraceD is used in unit tests to set up a daemon with certain settings. -// It takes care of things like creating and cleaning up temporary directories. -// - -// The default number of managed data directories to use. -const DEFAULT_NUM_DATA_DIRS = 2 - -// Builds a MiniHTraced object. -type MiniHTracedBuilder struct { - // The name of the MiniHTraced to build. This shows up in the test directory name and some - // other places. - Name string - - // The configuration values to use for the MiniHTraced. - // If ths is nil, we use the default configuration for everything. - Cnf map[string]string - - // The DataDirs to use. Empty entries will turn into random names. - DataDirs []string - - // If true, we will keep the data dirs around after MiniHTraced#Close - KeepDataDirsOnClose bool - - // If non-null, the WrittenSpans semaphore to use when creating the DataStore. - WrittenSpans *common.Semaphore - - // The test hooks to use for the HRPC server - HrpcTestHooks *hrpcTestHooks -} - -type MiniHTraced struct { - Name string - Cnf *conf.Config - DataDirs []string - Store *dataStore - Rsv *RestServer - Hsv *HrpcServer - Lg *common.Logger - KeepDataDirsOnClose bool -} - -func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) { - var err error - var store *dataStore - var rsv *RestServer - var hsv *HrpcServer - if bld.Name == "" { - bld.Name = "HTraceTest" - } - if bld.Cnf == nil { - bld.Cnf = make(map[string]string) - } - if bld.DataDirs == nil { - bld.DataDirs = make([]string, 2) - } - for idx := range bld.DataDirs { - if bld.DataDirs[idx] == "" { - bld.DataDirs[idx], err = ioutil.TempDir(os.TempDir(), - fmt.Sprintf("%s%d", bld.Name, idx+1)) - if err != nil { - return nil, err - } - } - } - // Copy the default test configuration values. - for k, v := range conf.TEST_VALUES() { - _, hasVal := bld.Cnf[k] - if !hasVal { - bld.Cnf[k] = v - } - } - bld.Cnf[conf.HTRACE_DATA_STORE_DIRECTORIES] = - strings.Join(bld.DataDirs, conf.PATH_LIST_SEP) - cnfBld := conf.Builder{Values: bld.Cnf, Defaults: conf.DEFAULTS} - cnf, err := cnfBld.Build() - if err != nil { - return nil, err - } - lg := common.NewLogger("mini.htraced", cnf) - defer func() { - if err != nil { - if store != nil { - store.Close() - } - for idx := range bld.DataDirs { - if !bld.KeepDataDirsOnClose { - if bld.DataDirs[idx] != "" { - os.RemoveAll(bld.DataDirs[idx]) - } - } - } - if rsv != nil { - rsv.Close() - } - lg.Infof("Failed to create MiniHTraced %s: %s\n", bld.Name, err.Error()) - lg.Close() - } - }() - store, err = CreateDataStore(cnf, bld.WrittenSpans) - if err != nil { - return nil, err - } - rstListener, listenErr := net.Listen("tcp", cnf.Get(conf.HTRACE_WEB_ADDRESS)) - if listenErr != nil { - return nil, listenErr - } - defer func() { - if rstListener != nil { - rstListener.Close() - } - }() - rsv, err = CreateRestServer(cnf, store, rstListener) - if err != nil { - return nil, err - } - rstListener = nil - hsv, err = CreateHrpcServer(cnf, store, bld.HrpcTestHooks) - if err != nil { - return nil, err - } - - lg.Infof("Created MiniHTraced %s\n", bld.Name) - return &MiniHTraced{ - Name: bld.Name, - Cnf: cnf, - DataDirs: bld.DataDirs, - Store: store, - Rsv: rsv, - Hsv: hsv, - Lg: lg, - KeepDataDirsOnClose: bld.KeepDataDirsOnClose, - }, nil -} - -// Return a Config object that clients can use to connect to this MiniHTraceD. -func (ht *MiniHTraced) ClientConf() *conf.Config { - return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String(), - conf.HTRACE_HRPC_ADDRESS, ht.Hsv.Addr().String()) -} - -// Return a Config object that clients can use to connect to this MiniHTraceD -// by HTTP only (no HRPC). -func (ht *MiniHTraced) RestOnlyClientConf() *conf.Config { - return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String(), - conf.HTRACE_HRPC_ADDRESS, "") -} - -func (ht *MiniHTraced) Close() { - ht.Lg.Infof("Closing MiniHTraced %s\n", ht.Name) - ht.Rsv.Close() - ht.Hsv.Close() - ht.Store.Close() - if !ht.KeepDataDirsOnClose { - for idx := range ht.DataDirs { - ht.Lg.Infof("Removing %s...\n", ht.DataDirs[idx]) - os.RemoveAll(ht.DataDirs[idx]) - } - } - ht.Lg.Infof("Finished closing MiniHTraced %s\n", ht.Name) - ht.Lg.Close() -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go deleted file mode 100644 index 2d6a76f..0000000 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/reaper_test.go +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package main - -import ( - "fmt" - "math/rand" - "org/apache/htrace/common" - "org/apache/htrace/conf" - "org/apache/htrace/test" - "testing" - "time" -) - -func TestReapingOldSpans(t *testing.T) { - const NUM_TEST_SPANS = 20 - testSpans := make([]*common.Span, NUM_TEST_SPANS) - rnd := rand.New(rand.NewSource(2)) - now := common.TimeToUnixMs(time.Now().UTC()) - for i := range testSpans { - testSpans[i] = test.NewRandomSpan(rnd, testSpans[0:i]) - testSpans[i].Begin = now - int64(NUM_TEST_SPANS-1-i) - testSpans[i].Description = fmt.Sprintf("Span%02d", i) - } - htraceBld := &MiniHTracedBuilder{Name: "TestReapingOldSpans", - Cnf: map[string]string{ - conf.HTRACE_SPAN_EXPIRY_MS: fmt.Sprintf("%d", 60*60*1000), - conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS: "1", - conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "1", - }, - WrittenSpans: common.NewSemaphore(0), - DataDirs: make([]string, 2), - } - ht, err := htraceBld.Build() - if err != nil { - t.Fatalf("failed to create mini htraced cluster: %s\n", err.Error()) - } - ing := ht.Store.NewSpanIngestor(ht.Store.lg, "127.0.0.1", "") - for spanIdx := range testSpans { - ing.IngestSpan(testSpans[spanIdx]) - } - ing.Close(time.Now()) - // Wait the spans to be created - ht.Store.WrittenSpans.Waits(NUM_TEST_SPANS) - // Set a reaper date that will remove all the spans except final one. - ht.Store.rpr.SetReaperDate(now) - - common.WaitFor(5*time.Minute, time.Millisecond, func() bool { - for i := 0; i < NUM_TEST_SPANS-1; i++ { - span := ht.Store.FindSpan(testSpans[i].Id) - if span != nil { - ht.Store.lg.Debugf("Waiting for %s to be removed...\n", - testSpans[i].Description) - return false - } - } - span := ht.Store.FindSpan(testSpans[NUM_TEST_SPANS-1].Id) - if span == nil { - ht.Store.lg.Debugf("Did not expect %s to be removed\n", - testSpans[NUM_TEST_SPANS-1].Description) - return false - } - return true - }) - defer ht.Close() -}
