http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/conf/config_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/conf/config_test.go b/htrace-htraced/go/src/htrace/conf/config_test.go new file mode 100644 index 0000000..bdab187 --- /dev/null +++ b/htrace-htraced/go/src/htrace/conf/config_test.go @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package conf + +import ( + "bytes" + "os" + "strings" + "testing" +) + +// Test that parsing command-line arguments of the form -Dfoo=bar works. +func TestParseArgV(t *testing.T) { + t.Parallel() + argv := []string{"-Dfoo=bar", "-Dbaz=123", "-DsillyMode", "-Dlog.path="} + bld := &Builder{Argv: argv, + Defaults: map[string]string{ + "log.path": "/log/path/default", + }} + cnf, err := bld.Build() + if err != nil { + t.Fatal() + } + if "bar" != cnf.Get("foo") { + t.Fatal() + } + if 123 != cnf.GetInt("baz") { + t.Fatal() + } + if !cnf.GetBool("sillyMode") { + t.Fatal() + } + if cnf.GetBool("otherSillyMode") { + t.Fatal() + } + if "" != cnf.Get("log.path") { + t.Fatal() + } +} + +// Test that default values work. +// Defaults are used only when the configuration option is not present or can't be parsed. +func TestDefaults(t *testing.T) { + t.Parallel() + argv := []string{"-Dfoo=bar", "-Dbaz=invalidNumber"} + defaults := map[string]string{ + "foo": "notbar", + "baz": "456", + "foo2": "4611686018427387904", + } + bld := &Builder{Argv: argv, Defaults: defaults} + cnf, err := bld.Build() + if err != nil { + t.Fatal() + } + if "bar" != cnf.Get("foo") { + t.Fatal() + } + if 456 != cnf.GetInt("baz") { + t.Fatal() + } + if 4611686018427387904 != cnf.GetInt64("foo2") { + t.Fatal() + } +} + +// Test that we can parse our XML configuration file. +func TestXmlConfigurationFile(t *testing.T) { + t.Parallel() + xml := ` +<?xml version="1.0"?> +<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?> +<configuration> + <property> + <name>foo.bar</name> + <value>123</value> + </property> + <property> + <name>foo.baz</name> + <value>xmlValue</value> + </property> + <!--<property> + <name>commented.out</name> + <value>stuff</value> + </property>--> +</configuration> +` + xmlReader := strings.NewReader(xml) + argv := []string{"-Dfoo.bar=456"} + defaults := map[string]string{ + "foo.bar": "789", + "cmdline.opt": "4611686018427387904", + } + bld := &Builder{Argv: argv, Defaults: defaults, Reader: xmlReader} + cnf, err := bld.Build() + if err != nil { + t.Fatal() + } + // The command-line argument takes precedence over the XML and the defaults. + if 456 != cnf.GetInt("foo.bar") { + t.Fatal() + } + if "xmlValue" != cnf.Get("foo.baz") { + t.Fatalf("foo.baz = %s", cnf.Get("foo.baz")) + } + if "" != cnf.Get("commented.out") { + t.Fatal() + } + if 4611686018427387904 != cnf.GetInt64("cmdline.opt") { + t.Fatal() + } +} + +// Test our handling of the HTRACE_CONF_DIR environment variable. +func TestGetHTracedConfDirs(t *testing.T) { + os.Setenv("HTRACED_CONF_DIR", "") + dlog := new(bytes.Buffer) + dirs := getHTracedConfDirs(dlog) + if len(dirs) != 1 || dirs[0] != getDefaultHTracedConfDir() { + t.Fatal() + } + os.Setenv("HTRACED_CONF_DIR", "/foo/bar:/baz") + dirs = getHTracedConfDirs(dlog) + if len(dirs) != 2 || dirs[0] != "/foo/bar" || dirs[1] != "/baz" { + t.Fatal() + } +}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/conf/xml.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/conf/xml.go b/htrace-htraced/go/src/htrace/conf/xml.go new file mode 100644 index 0000000..de14bc5 --- /dev/null +++ b/htrace-htraced/go/src/htrace/conf/xml.go @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package conf + +import ( + "encoding/xml" + "io" + "log" +) + +type configuration struct { + Properties []propertyXml `xml:"property"` +} + +type propertyXml struct { + Name string `xml:"name"` + Value string `xml:"value"` +} + +// Parse an XML configuration file. +func parseXml(reader io.Reader, m map[string]string) error { + dec := xml.NewDecoder(reader) + configurationXml := configuration{} + err := dec.Decode(&configurationXml) + if err != nil { + return err + } + props := configurationXml.Properties + for p := range props { + key := props[p].Name + value := props[p].Value + if key == "" { + log.Println("Warning: ignoring element with missing or empty <name>.") + continue + } + if value == "" { + log.Println("Warning: ignoring element with key " + key + " with missing or empty <value>.") + continue + } + //log.Printf("setting %s to %s\n", key, value) + m[key] = value + } + return nil +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/client_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/htraced/client_test.go b/htrace-htraced/go/src/htrace/htraced/client_test.go new file mode 100644 index 0000000..6b50097 --- /dev/null +++ b/htrace-htraced/go/src/htrace/htraced/client_test.go @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "fmt" + "github.com/ugorji/go/codec" + htrace "htrace/client" + "htrace/common" + "htrace/conf" + "htrace/test" + "math" + "math/rand" + "sort" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestClientGetServerVersion(t *testing.T) { + htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerVersion", + DataDirs: make([]string, 2)} + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf(), nil) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + defer hcl.Close() + _, err = hcl.GetServerVersion() + if err != nil { + t.Fatalf("failed to call GetServerVersion: %s", err.Error()) + } +} + +func TestClientGetServerDebugInfo(t *testing.T) { + htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerDebugInfo", + DataDirs: make([]string, 2)} + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf(), nil) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + defer hcl.Close() + debugInfo, err := hcl.GetServerDebugInfo() + if err != nil { + t.Fatalf("failed to call GetServerDebugInfo: %s", err.Error()) + } + if debugInfo.StackTraces == "" { + t.Fatalf(`debugInfo.StackTraces == ""`) + } + if debugInfo.GCStats == "" { + t.Fatalf(`debugInfo.GCStats == ""`) + } +} + +func createRandomTestSpans(amount int) common.SpanSlice { + rnd := rand.New(rand.NewSource(2)) + allSpans := make(common.SpanSlice, amount) + allSpans[0] = test.NewRandomSpan(rnd, allSpans[0:0]) + for i := 1; i < amount; i++ { + allSpans[i] = test.NewRandomSpan(rnd, allSpans[1:i]) + } + allSpans[1].SpanData.Parents = []common.SpanId{common.SpanId(allSpans[0].Id)} + return allSpans +} + +func TestClientOperations(t *testing.T) { + htraceBld := &MiniHTracedBuilder{Name: "TestClientOperations", + DataDirs: make([]string, 2), + WrittenSpans: common.NewSemaphore(0), + } + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf(), nil) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + defer hcl.Close() + + // Create some random trace spans. + NUM_TEST_SPANS := 30 + allSpans := createRandomTestSpans(NUM_TEST_SPANS) + + // Write half of the spans to htraced via the client. + err = hcl.WriteSpans(allSpans[0 : NUM_TEST_SPANS/2]) + if err != nil { + t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2, + err.Error()) + } + ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS / 2)) + + // Look up the first half of the spans. They should be found. + var span *common.Span + for i := 0; i < NUM_TEST_SPANS/2; i++ { + span, err = hcl.FindSpan(allSpans[i].Id) + if err != nil { + t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) + } + common.ExpectSpansEqual(t, allSpans[i], span) + } + + // Look up the second half of the spans. They should not be found. + for i := NUM_TEST_SPANS / 2; i < NUM_TEST_SPANS; i++ { + span, err = hcl.FindSpan(allSpans[i].Id) + if err != nil { + t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) + } + if span != nil { + t.Fatalf("Unexpectedly found a span we never write to "+ + "the server: FindSpan(%d) succeeded\n", i) + } + } + + // Test FindChildren + childSpan := allSpans[1] + parentId := childSpan.Parents[0] + var children []common.SpanId + children, err = hcl.FindChildren(parentId, 1) + if err != nil { + t.Fatalf("FindChildren(%s) failed: %s\n", parentId, err.Error()) + } + if len(children) != 1 { + t.Fatalf("FindChildren(%s) returned an invalid number of "+ + "children: expected %d, got %d\n", parentId, 1, len(children)) + } + if !children[0].Equal(childSpan.Id) { + t.Fatalf("FindChildren(%s) returned an invalid child id: expected %s, "+ + " got %s\n", parentId, childSpan.Id, children[0]) + } + + // Test FindChildren on a span that has no children + childlessSpan := allSpans[NUM_TEST_SPANS/2] + children, err = hcl.FindChildren(childlessSpan.Id, 10) + if err != nil { + t.Fatalf("FindChildren(%d) failed: %s\n", childlessSpan.Id, err.Error()) + } + if len(children) != 0 { + t.Fatalf("FindChildren(%d) returned an invalid number of "+ + "children: expected %d, got %d\n", childlessSpan.Id, 0, len(children)) + } + + // Test Query + var query common.Query + query = common.Query{Lim: 10} + spans, err := hcl.Query(&query) + if err != nil { + t.Fatalf("Query({lim: %d}) failed: %s\n", 10, err.Error()) + } + if len(spans) != 10 { + t.Fatalf("Query({lim: %d}) returned an invalid number of "+ + "children: expected %d, got %d\n", 10, 10, len(spans)) + } +} + +func TestDumpAll(t *testing.T) { + htraceBld := &MiniHTracedBuilder{Name: "TestDumpAll", + DataDirs: make([]string, 2), + WrittenSpans: common.NewSemaphore(0), + Cnf: map[string]string{ + conf.HTRACE_LOG_LEVEL: "INFO", + }, + } + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf(), nil) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + defer hcl.Close() + + NUM_TEST_SPANS := 100 + allSpans := createRandomTestSpans(NUM_TEST_SPANS) + sort.Sort(allSpans) + err = hcl.WriteSpans(allSpans) + if err != nil { + t.Fatalf("WriteSpans failed: %s\n", err.Error()) + } + ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS)) + out := make(chan *common.Span, NUM_TEST_SPANS) + var dumpErr error + go func() { + dumpErr = hcl.DumpAll(3, out) + }() + var numSpans int + nextLogTime := time.Now().Add(time.Millisecond * 5) + for { + span, channelOpen := <-out + if !channelOpen { + break + } + common.ExpectSpansEqual(t, allSpans[numSpans], span) + numSpans++ + if testing.Verbose() { + now := time.Now() + if !now.Before(nextLogTime) { + nextLogTime = now + nextLogTime = nextLogTime.Add(time.Millisecond * 5) + fmt.Printf("read back %d span(s)...\n", numSpans) + } + } + } + if numSpans != len(allSpans) { + t.Fatalf("expected to read %d spans... but only read %d\n", + len(allSpans), numSpans) + } + if dumpErr != nil { + t.Fatalf("got dump error %s\n", dumpErr.Error()) + } +} + +const EXAMPLE_CONF_KEY = "example.conf.key" +const EXAMPLE_CONF_VALUE = "foo.bar.baz" + +func TestClientGetServerConf(t *testing.T) { + htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerConf", + Cnf: map[string]string{ + EXAMPLE_CONF_KEY: EXAMPLE_CONF_VALUE, + }, + DataDirs: make([]string, 2)} + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf(), nil) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + defer hcl.Close() + serverCnf, err2 := hcl.GetServerConf() + if err2 != nil { + t.Fatalf("failed to call GetServerConf: %s", err2.Error()) + } + if serverCnf[EXAMPLE_CONF_KEY] != EXAMPLE_CONF_VALUE { + t.Fatalf("unexpected value for %s: %s", + EXAMPLE_CONF_KEY, EXAMPLE_CONF_VALUE) + } +} + +const TEST_NUM_HRPC_HANDLERS = 2 + +const TEST_NUM_WRITESPANS = 4 + +// Tests that HRPC limits the number of simultaneous connections being processed. +func TestHrpcAdmissionsControl(t *testing.T) { + var wg sync.WaitGroup + wg.Add(TEST_NUM_WRITESPANS) + var numConcurrentHrpcCalls int32 + testHooks := &hrpcTestHooks{ + HandleAdmission: func() { + defer wg.Done() + n := atomic.AddInt32(&numConcurrentHrpcCalls, 1) + if n > TEST_NUM_HRPC_HANDLERS { + t.Fatalf("The number of concurrent HRPC calls went above "+ + "%d: it's at %d\n", TEST_NUM_HRPC_HANDLERS, n) + } + time.Sleep(1 * time.Millisecond) + n = atomic.AddInt32(&numConcurrentHrpcCalls, -1) + if n >= TEST_NUM_HRPC_HANDLERS { + t.Fatalf("The number of concurrent HRPC calls went above "+ + "%d: it was at %d\n", TEST_NUM_HRPC_HANDLERS, n+1) + } + }, + } + htraceBld := &MiniHTracedBuilder{Name: "TestHrpcAdmissionsControl", + DataDirs: make([]string, 2), + Cnf: map[string]string{ + conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", TEST_NUM_HRPC_HANDLERS), + }, + WrittenSpans: common.NewSemaphore(0), + HrpcTestHooks: testHooks, + } + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf(), nil) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + // Create some random trace spans. + allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS) + for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ { + go func(i int) { + err = hcl.WriteSpans(allSpans[i : i+1]) + if err != nil { + t.Fatalf("WriteSpans failed: %s\n", err.Error()) + } + }(iter) + } + wg.Wait() + ht.Store.WrittenSpans.Waits(int64(TEST_NUM_WRITESPANS)) +} + +// Tests that HRPC I/O timeouts work. +func TestHrpcIoTimeout(t *testing.T) { + htraceBld := &MiniHTracedBuilder{Name: "TestHrpcIoTimeout", + DataDirs: make([]string, 2), + Cnf: map[string]string{ + conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", TEST_NUM_HRPC_HANDLERS), + conf.HTRACE_HRPC_IO_TIMEOUT_MS: "1", + }, + } + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() + var hcl *htrace.Client + finishClient := make(chan interface{}) + defer func() { + // Close the finishClient channel, if it hasn't already been closed. + defer func() { recover() }() + close(finishClient) + }() + testHooks := &htrace.TestHooks{ + HandleWriteRequestBody: func() { + <-finishClient + }, + } + hcl, err = htrace.NewClient(ht.ClientConf(), testHooks) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + // Create some random trace spans. + allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS) + var wg sync.WaitGroup + wg.Add(TEST_NUM_WRITESPANS) + for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ { + go func(i int) { + defer wg.Done() + // Ignore the error return because there are internal retries in + // the client which will make this succeed eventually, usually. + // Keep in mind that we only block until we have seen + // TEST_NUM_WRITESPANS I/O errors in the HRPC server-- after that, + // we let requests through so that the test can exit cleanly. + hcl.WriteSpans(allSpans[i : i+1]) + }(iter) + } + for { + if ht.Hsv.GetNumIoErrors() >= TEST_NUM_WRITESPANS { + break + } + time.Sleep(1000 * time.Nanosecond) + } + close(finishClient) + wg.Wait() +} + +func doWriteSpans(name string, N int, maxSpansPerRpc uint32, b *testing.B) { + htraceBld := &MiniHTracedBuilder{Name: "doWriteSpans", + Cnf: map[string]string{ + conf.HTRACE_LOG_LEVEL: "INFO", + conf.HTRACE_NUM_HRPC_HANDLERS: "20", + }, + WrittenSpans: common.NewSemaphore(int64(1 - N)), + } + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + rnd := rand.New(rand.NewSource(1)) + allSpans := make([]*common.Span, N) + for n := 0; n < N; n++ { + allSpans[n] = test.NewRandomSpan(rnd, allSpans[0:n]) + } + // Determine how many calls to WriteSpans we should make. Each writeSpans + // message should be small enough so that it doesn't exceed the max RPC + // body length limit. TODO: a production-quality golang client would do + // this internally rather than needing us to do it here in the unit test. + bodyLen := (4 * common.MAX_HRPC_BODY_LENGTH) / 5 + reqs := make([][]*common.Span, 0, 4) + curReq := -1 + curReqLen := bodyLen + var curReqSpans uint32 + mh := new(codec.MsgpackHandle) + mh.WriteExt = true + var mbuf [8192]byte + buf := mbuf[:0] + enc := codec.NewEncoderBytes(&buf, mh) + for n := 0; n < N; n++ { + span := allSpans[n] + if (curReqSpans >= maxSpansPerRpc) || + (curReqLen >= bodyLen) { + reqs = append(reqs, make([]*common.Span, 0, 16)) + curReqLen = 0 + curReq++ + curReqSpans = 0 + } + buf = mbuf[:0] + enc.ResetBytes(&buf) + err := enc.Encode(span) + if err != nil { + panic(fmt.Sprintf("Error encoding span %s: %s\n", + span.String(), err.Error())) + } + bufLen := len(buf) + if bufLen > (bodyLen / 5) { + panic(fmt.Sprintf("Span too long at %d bytes\n", bufLen)) + } + curReqLen += bufLen + reqs[curReq] = append(reqs[curReq], span) + curReqSpans++ + } + ht.Store.lg.Infof("num spans: %d. num WriteSpansReq calls: %d\n", N, len(reqs)) + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf(), nil) + if err != nil { + panic(fmt.Sprintf("failed to create client: %s", err.Error())) + } + defer hcl.Close() + + // Reset the timer to avoid including the time required to create new + // random spans in the benchmark total. + if b != nil { + b.ResetTimer() + } + + // Write many random spans. + for reqIdx := range reqs { + go func(i int) { + err = hcl.WriteSpans(reqs[i]) + if err != nil { + panic(fmt.Sprintf("failed to send WriteSpans request %d: %s", + i, err.Error())) + } + }(reqIdx) + } + // Wait for all the spans to be written. + ht.Store.WrittenSpans.Wait() +} + +// This is a test of how quickly we can create new spans via WriteSpans RPCs. +// Like BenchmarkDatastoreWrites, it creates b.N spans in the datastore. +// Unlike that benchmark, it sends the spans via RPC. +// Suggested flags for running this: +// -tags unsafe -cpu 16 -benchtime=1m +func BenchmarkWriteSpans(b *testing.B) { + doWriteSpans("BenchmarkWriteSpans", b.N, math.MaxUint32, b) +} + +func TestWriteSpansRpcs(t *testing.T) { + doWriteSpans("TestWriteSpansRpcs", 3000, 1000, nil) +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/datastore.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/htraced/datastore.go b/htrace-htraced/go/src/htrace/htraced/datastore.go new file mode 100644 index 0000000..26531af --- /dev/null +++ b/htrace-htraced/go/src/htrace/htraced/datastore.go @@ -0,0 +1,1339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bytes" + "encoding/hex" + "errors" + "fmt" + "github.com/jmhodges/levigo" + "github.com/ugorji/go/codec" + "htrace/common" + "htrace/conf" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" +) + +// +// The data store code for HTraced. +// +// This code stores the trace spans. We use levelDB here so that we don't have to store everything +// in memory at all times. The data is sharded across multiple levelDB databases in multiple +// directories. Normally, these multiple directories will be on multiple disk drives. +// +// The main emphasis in the HTraceD data store is on quickly and efficiently storing trace span data +// coming from many daemons. Durability is not as big a concern as in some data stores, since +// losing a little bit of trace data if htraced goes down is not critical. We use msgpack +// for serialization. We assume that there will be many more writes than reads. +// +// Schema +// w -> ShardInfo +// s[8-byte-big-endian-sid] -> SpanData +// b[8-byte-big-endian-begin-time][8-byte-big-endian-child-sid] -> {} +// e[8-byte-big-endian-end-time][8-byte-big-endian-child-sid] -> {} +// d[8-byte-big-endian-duration][8-byte-big-endian-child-sid] -> {} +// p[8-byte-big-endian-parent-sid][8-byte-big-endian-child-sid] -> {} +// +// Note that span IDs are unsigned 64-bit numbers. +// Begin times, end times, and durations are signed 64-bit numbers. +// In order to get LevelDB to properly compare the signed 64-bit quantities, +// we flip the highest bit. This way, we can get leveldb to view negative +// quantities as less than non-negative ones. This also means that we can do +// all queries using unsigned 64-bit math, rather than having to special-case +// the signed fields. +// + +var EMPTY_BYTE_BUF []byte = []byte{} + +const SPAN_ID_INDEX_PREFIX = 's' +const BEGIN_TIME_INDEX_PREFIX = 'b' +const END_TIME_INDEX_PREFIX = 'e' +const DURATION_INDEX_PREFIX = 'd' +const PARENT_ID_INDEX_PREFIX = 'p' +const INVALID_INDEX_PREFIX = 0 + +// The maximum span expiry time, in milliseconds. +// For all practical purposes this is "never" since it's more than a million years. +const MAX_SPAN_EXPIRY_MS = 0x7ffffffffffffff + +type IncomingSpan struct { + // The address that the span was sent from. + Addr string + + // The span. + *common.Span + + // Serialized span data + SpanDataBytes []byte +} + +// A single directory containing a levelDB instance. +type shard struct { + // The data store that this shard is part of + store *dataStore + + // The LevelDB instance. + ldb *levigo.DB + + // The path to the leveldb directory this shard is managing. + path string + + // Incoming requests to write Spans. + incoming chan []*IncomingSpan + + // A channel for incoming heartbeats + heartbeats chan interface{} + + // Tracks whether the shard goroutine has exited. + exited sync.WaitGroup +} + +// Process incoming spans for a shard. +func (shd *shard) processIncoming() { + lg := shd.store.lg + defer func() { + lg.Infof("Shard processor for %s exiting.\n", shd.path) + shd.exited.Done() + }() + for { + select { + case spans := <-shd.incoming: + if spans == nil { + return + } + totalWritten := 0 + totalDropped := 0 + for spanIdx := range spans { + err := shd.writeSpan(spans[spanIdx]) + if err != nil { + lg.Errorf("Shard processor for %s got fatal error %s.\n", + shd.path, err.Error()) + totalDropped++ + } else { + if lg.TraceEnabled() { + lg.Tracef("Shard processor for %s wrote span %s.\n", + shd.path, spans[spanIdx].ToJson()) + } + totalWritten++ + } + } + shd.store.msink.UpdatePersisted(spans[0].Addr, totalWritten, totalDropped) + if shd.store.WrittenSpans != nil { + lg.Debugf("Shard %s incrementing WrittenSpans by %d\n", shd.path, len(spans)) + shd.store.WrittenSpans.Posts(int64(len(spans))) + } + case <-shd.heartbeats: + lg.Tracef("Shard processor for %s handling heartbeat.\n", shd.path) + shd.pruneExpired() + } + } +} + +func (shd *shard) pruneExpired() { + lg := shd.store.rpr.lg + src, err := CreateReaperSource(shd) + if err != nil { + lg.Errorf("Error creating reaper source for shd(%s): %s\n", + shd.path, err.Error()) + return + } + var totalReaped uint64 + defer func() { + src.Close() + if totalReaped > 0 { + atomic.AddUint64(&shd.store.rpr.ReapedSpans, totalReaped) + } + }() + urdate := s2u64(shd.store.rpr.GetReaperDate()) + for { + span := src.next() + if span == nil { + lg.Debugf("After reaping %d span(s), no more found in shard %s "+ + "to reap.\n", totalReaped, shd.path) + return + } + begin := s2u64(span.Begin) + if begin >= urdate { + lg.Debugf("After reaping %d span(s), the remaining spans in "+ + "shard %s are new enough to be kept\n", + totalReaped, shd.path) + return + } + err = shd.DeleteSpan(span) + if err != nil { + lg.Errorf("Error deleting span %s from shd(%s): %s\n", + span.String(), shd.path, err.Error()) + return + } + if lg.TraceEnabled() { + lg.Tracef("Reaped span %s from shard %s\n", span.String(), shd.path) + } + totalReaped++ + } +} + +// Delete a span from the shard. Note that leveldb may retain the data until +// compaction(s) remove it. +func (shd *shard) DeleteSpan(span *common.Span) error { + batch := levigo.NewWriteBatch() + defer batch.Close() + primaryKey := + append([]byte{SPAN_ID_INDEX_PREFIX}, span.Id.Val()...) + batch.Delete(primaryKey) + for parentIdx := range span.Parents { + key := append(append([]byte{PARENT_ID_INDEX_PREFIX}, + span.Parents[parentIdx].Val()...), span.Id.Val()...) + batch.Delete(key) + } + beginTimeKey := append(append([]byte{BEGIN_TIME_INDEX_PREFIX}, + u64toSlice(s2u64(span.Begin))...), span.Id.Val()...) + batch.Delete(beginTimeKey) + endTimeKey := append(append([]byte{END_TIME_INDEX_PREFIX}, + u64toSlice(s2u64(span.End))...), span.Id.Val()...) + batch.Delete(endTimeKey) + durationKey := append(append([]byte{DURATION_INDEX_PREFIX}, + u64toSlice(s2u64(span.Duration()))...), span.Id.Val()...) + batch.Delete(durationKey) + err := shd.ldb.Write(shd.store.writeOpts, batch) + if err != nil { + return err + } + return nil +} + +// Convert a signed 64-bit number into an unsigned 64-bit number. We flip the +// highest bit, so that negative input values map to unsigned numbers which are +// less than non-negative input values. +func s2u64(val int64) uint64 { + ret := uint64(val) + ret ^= 0x8000000000000000 + return ret +} + +func u64toSlice(val uint64) []byte { + return []byte{ + byte(0xff & (val >> 56)), + byte(0xff & (val >> 48)), + byte(0xff & (val >> 40)), + byte(0xff & (val >> 32)), + byte(0xff & (val >> 24)), + byte(0xff & (val >> 16)), + byte(0xff & (val >> 8)), + byte(0xff & (val >> 0))} +} + +func (shd *shard) writeSpan(ispan *IncomingSpan) error { + batch := levigo.NewWriteBatch() + defer batch.Close() + span := ispan.Span + primaryKey := + append([]byte{SPAN_ID_INDEX_PREFIX}, span.Id.Val()...) + batch.Put(primaryKey, ispan.SpanDataBytes) + + // Add this to the parent index. + for parentIdx := range span.Parents { + key := append(append([]byte{PARENT_ID_INDEX_PREFIX}, + span.Parents[parentIdx].Val()...), span.Id.Val()...) + batch.Put(key, EMPTY_BYTE_BUF) + } + + // Add to the other secondary indices. + beginTimeKey := append(append([]byte{BEGIN_TIME_INDEX_PREFIX}, + u64toSlice(s2u64(span.Begin))...), span.Id.Val()...) + batch.Put(beginTimeKey, EMPTY_BYTE_BUF) + endTimeKey := append(append([]byte{END_TIME_INDEX_PREFIX}, + u64toSlice(s2u64(span.End))...), span.Id.Val()...) + batch.Put(endTimeKey, EMPTY_BYTE_BUF) + durationKey := append(append([]byte{DURATION_INDEX_PREFIX}, + u64toSlice(s2u64(span.Duration()))...), span.Id.Val()...) + batch.Put(durationKey, EMPTY_BYTE_BUF) + + err := shd.ldb.Write(shd.store.writeOpts, batch) + if err != nil { + shd.store.lg.Errorf("Error writing span %s to leveldb at %s: %s\n", + span.String(), shd.path, err.Error()) + return err + } + return nil +} + +func (shd *shard) FindChildren(sid common.SpanId, childIds []common.SpanId, + lim int32) ([]common.SpanId, int32, error) { + searchKey := append([]byte{PARENT_ID_INDEX_PREFIX}, sid.Val()...) + iter := shd.ldb.NewIterator(shd.store.readOpts) + defer iter.Close() + iter.Seek(searchKey) + for { + if !iter.Valid() { + break + } + if lim == 0 { + break + } + key := iter.Key() + if !bytes.HasPrefix(key, searchKey) { + break + } + id := common.SpanId(key[17:]) + childIds = append(childIds, id) + lim-- + iter.Next() + } + return childIds, lim, nil +} + +// Close a shard. +func (shd *shard) Close() { + lg := shd.store.lg + shd.incoming <- nil + lg.Infof("Waiting for %s to exit...\n", shd.path) + shd.exited.Wait() + shd.ldb.Close() + lg.Infof("Closed %s...\n", shd.path) +} + +type Reaper struct { + // The logger used by the reaper + lg *common.Logger + + // The number of milliseconds to keep spans around, in milliseconds. + spanExpiryMs int64 + + // The oldest date for which we'll keep spans. + reaperDate int64 + + // A channel used to send heartbeats to the reaper + heartbeats chan interface{} + + // Tracks whether the reaper goroutine has exited + exited sync.WaitGroup + + // The lock protecting reaper data. + lock sync.Mutex + + // The reaper heartbeater + hb *Heartbeater + + // The total number of spans which have been reaped. + ReapedSpans uint64 +} + +func NewReaper(cnf *conf.Config) *Reaper { + rpr := &Reaper{ + lg: common.NewLogger("reaper", cnf), + spanExpiryMs: cnf.GetInt64(conf.HTRACE_SPAN_EXPIRY_MS), + heartbeats: make(chan interface{}, 1), + } + if rpr.spanExpiryMs >= MAX_SPAN_EXPIRY_MS { + rpr.spanExpiryMs = MAX_SPAN_EXPIRY_MS + } else if rpr.spanExpiryMs <= 0 { + rpr.spanExpiryMs = MAX_SPAN_EXPIRY_MS + } + rpr.hb = NewHeartbeater("ReaperHeartbeater", + cnf.GetInt64(conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS), rpr.lg) + rpr.exited.Add(1) + go rpr.run() + rpr.hb.AddHeartbeatTarget(&HeartbeatTarget{ + name: "reaper", + targetChan: rpr.heartbeats, + }) + var when string + if rpr.spanExpiryMs >= MAX_SPAN_EXPIRY_MS { + when = "never" + } else { + when = "after " + time.Duration(rpr.spanExpiryMs).String() + } + rpr.lg.Infof("Initializing span reaper: span time out = %s.\n", when) + return rpr +} + +func (rpr *Reaper) run() { + defer func() { + rpr.lg.Info("Exiting Reaper goroutine.\n") + rpr.exited.Done() + }() + + for { + _, isOpen := <-rpr.heartbeats + if !isOpen { + return + } + rpr.handleHeartbeat() + } +} + +func (rpr *Reaper) handleHeartbeat() { + // TODO: check dataStore fullness + now := common.TimeToUnixMs(time.Now().UTC()) + d, updated := func() (int64, bool) { + rpr.lock.Lock() + defer rpr.lock.Unlock() + newReaperDate := now - rpr.spanExpiryMs + if newReaperDate > rpr.reaperDate { + rpr.reaperDate = newReaperDate + return rpr.reaperDate, true + } else { + return rpr.reaperDate, false + } + }() + if rpr.lg.DebugEnabled() { + if updated { + rpr.lg.Debugf("Updating UTC reaper date to %s.\n", + common.UnixMsToTime(d).Format(time.RFC3339)) + } else { + rpr.lg.Debugf("Not updating previous reaperDate of %s.\n", + common.UnixMsToTime(d).Format(time.RFC3339)) + } + } +} + +func (rpr *Reaper) GetReaperDate() int64 { + rpr.lock.Lock() + defer rpr.lock.Unlock() + return rpr.reaperDate +} + +func (rpr *Reaper) SetReaperDate(rdate int64) { + rpr.lock.Lock() + defer rpr.lock.Unlock() + rpr.reaperDate = rdate +} + +func (rpr *Reaper) Shutdown() { + rpr.hb.Shutdown() + close(rpr.heartbeats) +} + +// The Data Store. +type dataStore struct { + lg *common.Logger + + // The shards which manage our LevelDB instances. + shards []*shard + + // The read options to use for LevelDB. + readOpts *levigo.ReadOptions + + // The write options to use for LevelDB. + writeOpts *levigo.WriteOptions + + // If non-null, a semaphore we will increment once for each span we receive. + // Used for testing. + WrittenSpans *common.Semaphore + + // The metrics sink. + msink *MetricsSink + + // The heartbeater which periodically asks shards to update the MetricsSink. + hb *Heartbeater + + // The reaper for this datastore + rpr *Reaper + + // When this datastore was started (in UTC milliseconds since the epoch) + startMs int64 +} + +func CreateDataStore(cnf *conf.Config, writtenSpans *common.Semaphore) (*dataStore, error) { + dld := NewDataStoreLoader(cnf) + defer dld.Close() + err := dld.Load() + if err != nil { + dld.lg.Errorf("Error loading datastore: %s\n", err.Error()) + return nil, err + } + store := &dataStore{ + lg: dld.lg, + shards: make([]*shard, len(dld.shards)), + readOpts: dld.readOpts, + writeOpts: dld.writeOpts, + WrittenSpans: writtenSpans, + msink: NewMetricsSink(cnf), + hb: NewHeartbeater("DatastoreHeartbeater", + cnf.GetInt64(conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS), dld.lg), + rpr: NewReaper(cnf), + startMs: common.TimeToUnixMs(time.Now().UTC()), + } + spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE) + for shdIdx := range store.shards { + shd := &shard{ + store: store, + ldb: dld.shards[shdIdx].ldb, + path: dld.shards[shdIdx].path, + incoming: make(chan []*IncomingSpan, spanBufferSize), + heartbeats: make(chan interface{}, 1), + } + shd.exited.Add(1) + go shd.processIncoming() + store.shards[shdIdx] = shd + store.hb.AddHeartbeatTarget(&HeartbeatTarget{ + name: fmt.Sprintf("shard(%s)", shd.path), + targetChan: shd.heartbeats, + }) + } + dld.DisownResources() + return store, nil +} + +// Close the DataStore. +func (store *dataStore) Close() { + if store.hb != nil { + store.hb.Shutdown() + store.hb = nil + } + for idx := range store.shards { + if store.shards[idx] != nil { + store.shards[idx].Close() + store.shards[idx] = nil + } + } + if store.rpr != nil { + store.rpr.Shutdown() + store.rpr = nil + } + if store.readOpts != nil { + store.readOpts.Close() + store.readOpts = nil + } + if store.writeOpts != nil { + store.writeOpts.Close() + store.writeOpts = nil + } + if store.lg != nil { + store.lg.Close() + store.lg = nil + } +} + +// Get the index of the shard which stores the given spanId. +func (store *dataStore) getShardIndex(sid common.SpanId) int { + return int(sid.Hash32() % uint32(len(store.shards))) +} + +const WRITESPANS_BATCH_SIZE = 128 + +// SpanIngestor is a class used internally to ingest spans from an RPC +// endpoint. It groups spans destined for a particular shard into small +// batches, so that we can reduce the number of objects that need to be sent +// over the shard's "incoming" channel. Since sending objects over a channel +// requires goroutine synchronization, this improves performance. +// +// SpanIngestor also allows us to reuse the same encoder object for many spans, +// rather than creating a new encoder per span. This avoids re-doing the +// encoder setup for each span, and also generates less garbage. +type SpanIngestor struct { + // The logger to use. + lg *common.Logger + + // The dataStore we are ingesting spans into. + store *dataStore + + // The remote address these spans are coming from. + addr string + + // Default TracerId + defaultTrid string + + // The msgpack handle to use to serialize the spans. + mh codec.MsgpackHandle + + // The msgpack encoder to use to serialize the spans. + // Caching this avoids generating a lot of garbage and burning CPUs + // creating new encoder objects for each span. + enc *codec.Encoder + + // The buffer which codec.Encoder is currently serializing to. + // We have to create a new buffer for each span because once we hand it off to the shard, the + // shard manages the buffer lifecycle. + spanDataBytes []byte + + // An array mapping shard index to span batch. + batches []*SpanIngestorBatch + + // The total number of spans ingested. Includes dropped spans. + totalIngested int + + // The total number of spans the ingestor dropped because of a server-side error. + serverDropped int +} + +// A batch of spans destined for a particular shard. +type SpanIngestorBatch struct { + incoming []*IncomingSpan +} + +func (store *dataStore) NewSpanIngestor(lg *common.Logger, + addr string, defaultTrid string) *SpanIngestor { + ing := &SpanIngestor{ + lg: lg, + store: store, + addr: addr, + defaultTrid: defaultTrid, + spanDataBytes: make([]byte, 0, 1024), + batches: make([]*SpanIngestorBatch, len(store.shards)), + } + ing.mh.WriteExt = true + ing.enc = codec.NewEncoderBytes(&ing.spanDataBytes, &ing.mh) + for batchIdx := range ing.batches { + ing.batches[batchIdx] = &SpanIngestorBatch{ + incoming: make([]*IncomingSpan, 0, WRITESPANS_BATCH_SIZE), + } + } + return ing +} + +func (ing *SpanIngestor) IngestSpan(span *common.Span) { + ing.totalIngested++ + // Make sure the span ID is valid. + spanIdProblem := span.Id.FindProblem() + if spanIdProblem != "" { + // Can't print the invalid span ID because String() might fail. + ing.lg.Warnf("Invalid span ID: %s\n", spanIdProblem) + ing.serverDropped++ + return + } + + // Set the default tracer id, if needed. + if span.TracerId == "" { + span.TracerId = ing.defaultTrid + } + + // Encode the span data. Doing the encoding here is better than doing it + // in the shard goroutine, because we can achieve more parallelism. + // There is one shard goroutine per shard, but potentially many more + // ingestors per shard. + err := ing.enc.Encode(span.SpanData) + if err != nil { + ing.lg.Warnf("Failed to encode span ID %s: %s\n", + span.Id.String(), err.Error()) + ing.serverDropped++ + return + } + spanDataBytes := ing.spanDataBytes + ing.spanDataBytes = make([]byte, 0, 1024) + ing.enc.ResetBytes(&ing.spanDataBytes) + + // Determine which shard this span should go to. + shardIdx := ing.store.getShardIndex(span.Id) + batch := ing.batches[shardIdx] + incomingLen := len(batch.incoming) + if ing.lg.TraceEnabled() { + ing.lg.Tracef("SpanIngestor#IngestSpan: spanId=%s, shardIdx=%d, "+ + "incomingLen=%d, cap(batch.incoming)=%d\n", + span.Id.String(), shardIdx, incomingLen, cap(batch.incoming)) + } + if incomingLen+1 == cap(batch.incoming) { + if ing.lg.TraceEnabled() { + ing.lg.Tracef("SpanIngestor#IngestSpan: flushing %d spans for "+ + "shard %d\n", len(batch.incoming), shardIdx) + } + ing.store.WriteSpans(shardIdx, batch.incoming) + batch.incoming = make([]*IncomingSpan, 1, WRITESPANS_BATCH_SIZE) + incomingLen = 0 + } else { + batch.incoming = batch.incoming[0 : incomingLen+1] + } + batch.incoming[incomingLen] = &IncomingSpan{ + Addr: ing.addr, + Span: span, + SpanDataBytes: spanDataBytes, + } +} + +func (ing *SpanIngestor) Close(startTime time.Time) { + for shardIdx := range ing.batches { + batch := ing.batches[shardIdx] + if len(batch.incoming) > 0 { + if ing.lg.TraceEnabled() { + ing.lg.Tracef("SpanIngestor#Close: flushing %d span(s) for "+ + "shard %d\n", len(batch.incoming), shardIdx) + } + ing.store.WriteSpans(shardIdx, batch.incoming) + } + batch.incoming = nil + } + ing.lg.Debugf("Closed span ingestor for %s. Ingested %d span(s); dropped "+ + "%d span(s).\n", ing.addr, ing.totalIngested, ing.serverDropped) + + endTime := time.Now() + ing.store.msink.UpdateIngested(ing.addr, ing.totalIngested, + ing.serverDropped, endTime.Sub(startTime)) +} + +func (store *dataStore) WriteSpans(shardIdx int, ispans []*IncomingSpan) { + store.shards[shardIdx].incoming <- ispans +} + +func (store *dataStore) FindSpan(sid common.SpanId) *common.Span { + return store.shards[store.getShardIndex(sid)].FindSpan(sid) +} + +func (shd *shard) FindSpan(sid common.SpanId) *common.Span { + lg := shd.store.lg + primaryKey := append([]byte{SPAN_ID_INDEX_PREFIX}, sid.Val()...) + buf, err := shd.ldb.Get(shd.store.readOpts, primaryKey) + if err != nil { + if strings.Index(err.Error(), "NotFound:") != -1 { + return nil + } + lg.Warnf("Shard(%s): FindSpan(%s) error: %s\n", + shd.path, sid.String(), err.Error()) + return nil + } + var span *common.Span + span, err = shd.decodeSpan(sid, buf) + if err != nil { + lg.Errorf("Shard(%s): FindSpan(%s) decode error: %s decoding [%s]\n", + shd.path, sid.String(), err.Error(), hex.EncodeToString(buf)) + return nil + } + return span +} + +func (shd *shard) decodeSpan(sid common.SpanId, buf []byte) (*common.Span, error) { + r := bytes.NewBuffer(buf) + mh := new(codec.MsgpackHandle) + mh.WriteExt = true + decoder := codec.NewDecoder(r, mh) + data := common.SpanData{} + err := decoder.Decode(&data) + if err != nil { + return nil, err + } + if data.Parents == nil { + data.Parents = []common.SpanId{} + } + return &common.Span{Id: common.SpanId(sid), SpanData: data}, nil +} + +// Find the children of a given span id. +func (store *dataStore) FindChildren(sid common.SpanId, lim int32) []common.SpanId { + childIds := make([]common.SpanId, 0) + var err error + + startIdx := store.getShardIndex(sid) + idx := startIdx + numShards := len(store.shards) + for { + if lim == 0 { + break + } + shd := store.shards[idx] + childIds, lim, err = shd.FindChildren(sid, childIds, lim) + if err != nil { + store.lg.Errorf("Shard(%s): FindChildren(%s) error: %s\n", + shd.path, sid.String(), err.Error()) + } + idx++ + if idx >= numShards { + idx = 0 + } + if idx == startIdx { + break + } + } + return childIds +} + +type predicateData struct { + *common.Predicate + key []byte +} + +func loadPredicateData(pred *common.Predicate) (*predicateData, error) { + p := predicateData{Predicate: pred} + + // Parse the input value given to make sure it matches up with the field + // type. + switch pred.Field { + case common.SPAN_ID: + // Span IDs are sent as hex strings. + var id common.SpanId + if err := id.FromString(pred.Val); err != nil { + return nil, errors.New(fmt.Sprintf("Unable to parse span id '%s': %s", + pred.Val, err.Error())) + } + p.key = id.Val() + break + case common.DESCRIPTION: + // Any string is valid for a description. + p.key = []byte(pred.Val) + break + case common.BEGIN_TIME, common.END_TIME, common.DURATION: + // Parse a base-10 signed numeric field. + v, err := strconv.ParseInt(pred.Val, 10, 64) + if err != nil { + return nil, errors.New(fmt.Sprintf("Unable to parse %s '%s': %s", + pred.Field, pred.Val, err.Error())) + } + p.key = u64toSlice(s2u64(v)) + break + case common.TRACER_ID: + // Any string is valid for a tracer ID. + p.key = []byte(pred.Val) + break + default: + return nil, errors.New(fmt.Sprintf("Unknown field %s", pred.Field)) + } + + // Validate the predicate operation. + switch pred.Op { + case common.EQUALS, common.LESS_THAN_OR_EQUALS, + common.GREATER_THAN_OR_EQUALS, common.GREATER_THAN: + break + case common.CONTAINS: + if p.fieldIsNumeric() { + return nil, errors.New(fmt.Sprintf("Can't use CONTAINS on a "+ + "numeric field like '%s'", pred.Field)) + } + default: + return nil, errors.New(fmt.Sprintf("Unknown predicate operation '%s'", + pred.Op)) + } + + return &p, nil +} + +// Get the index prefix for this predicate, or 0 if it is not indexed. +func (pred *predicateData) getIndexPrefix() byte { + switch pred.Field { + case common.SPAN_ID: + return SPAN_ID_INDEX_PREFIX + case common.BEGIN_TIME: + return BEGIN_TIME_INDEX_PREFIX + case common.END_TIME: + return END_TIME_INDEX_PREFIX + case common.DURATION: + return DURATION_INDEX_PREFIX + default: + return INVALID_INDEX_PREFIX + } +} + +// Returns true if the predicate type is numeric. +func (pred *predicateData) fieldIsNumeric() bool { + switch pred.Field { + case common.SPAN_ID, common.BEGIN_TIME, common.END_TIME, common.DURATION: + return true + default: + return false + } +} + +// Get the values that this predicate cares about for a given span. +func (pred *predicateData) extractRelevantSpanData(span *common.Span) []byte { + switch pred.Field { + case common.SPAN_ID: + return span.Id.Val() + case common.DESCRIPTION: + return []byte(span.Description) + case common.BEGIN_TIME: + return u64toSlice(s2u64(span.Begin)) + case common.END_TIME: + return u64toSlice(s2u64(span.End)) + case common.DURATION: + return u64toSlice(s2u64(span.Duration())) + case common.TRACER_ID: + return []byte(span.TracerId) + default: + panic(fmt.Sprintf("Unknown field type %s.", pred.Field)) + } +} + +func (pred *predicateData) spanPtrIsBefore(a *common.Span, b *common.Span) bool { + // nil is after everything. + if a == nil { + if b == nil { + return false + } + return false + } else if b == nil { + return true + } + // Compare the spans according to this predicate. + aVal := pred.extractRelevantSpanData(a) + bVal := pred.extractRelevantSpanData(b) + cmp := bytes.Compare(aVal, bVal) + if pred.Op.IsDescending() { + return cmp > 0 + } else { + return cmp < 0 + } +} + +type satisfiedByReturn int + +const ( + NOT_SATISFIED satisfiedByReturn = iota + NOT_YET_SATISFIED = iota + SATISFIED = iota +) + +func (r satisfiedByReturn) String() string { + switch r { + case NOT_SATISFIED: + return "NOT_SATISFIED" + case NOT_YET_SATISFIED: + return "NOT_YET_SATISFIED" + case SATISFIED: + return "SATISFIED" + default: + return "(unknown)" + } +} + +// Determine whether the predicate is satisfied by the given span. +func (pred *predicateData) satisfiedBy(span *common.Span) satisfiedByReturn { + val := pred.extractRelevantSpanData(span) + switch pred.Op { + case common.CONTAINS: + if bytes.Contains(val, pred.key) { + return SATISFIED + } else { + return NOT_SATISFIED + } + case common.EQUALS: + if bytes.Equal(val, pred.key) { + return SATISFIED + } else { + return NOT_SATISFIED + } + case common.LESS_THAN_OR_EQUALS: + if bytes.Compare(val, pred.key) <= 0 { + return SATISFIED + } else { + return NOT_YET_SATISFIED + } + case common.GREATER_THAN_OR_EQUALS: + if bytes.Compare(val, pred.key) >= 0 { + return SATISFIED + } else { + return NOT_SATISFIED + } + case common.GREATER_THAN: + cmp := bytes.Compare(val, pred.key) + if cmp <= 0 { + return NOT_YET_SATISFIED + } else { + return SATISFIED + } + default: + panic(fmt.Sprintf("unknown Op type %s should have been caught "+ + "during normalization", pred.Op)) + } +} + +func (pred *predicateData) createSource(store *dataStore, prev *common.Span) (*source, error) { + var ret *source + src := source{store: store, + pred: pred, + shards: make([]*shard, len(store.shards)), + iters: make([]*levigo.Iterator, 0, len(store.shards)), + nexts: make([]*common.Span, len(store.shards)), + numRead: make([]int, len(store.shards)), + keyPrefix: pred.getIndexPrefix(), + } + if src.keyPrefix == INVALID_INDEX_PREFIX { + return nil, errors.New(fmt.Sprintf("Can't create source from unindexed "+ + "predicate on field %s", pred.Field)) + } + defer func() { + if ret == nil { + src.Close() + } + }() + for shardIdx := range store.shards { + shd := store.shards[shardIdx] + src.shards[shardIdx] = shd + src.iters = append(src.iters, shd.ldb.NewIterator(store.readOpts)) + } + var searchKey []byte + lg := store.lg + if prev != nil { + // If prev != nil, this query RPC is the continuation of a previous + // one. The final result returned the last time is 'prev'. + // + // To avoid returning the same results multiple times, we adjust the + // predicate here. If the predicate is on the span id field, we + // simply manipulate the span ID we're looking for. + // + // If the predicate is on a secondary index, we also use span ID, but + // in a slightly different way. Since the secondary indices are + // organized as [type-code][8b-secondary-key][8b-span-id], elements + // with the same secondary index field are ordered by span ID. So we + // create a 17-byte key incorporating the span ID from 'prev.' + startId := common.INVALID_SPAN_ID + switch pred.Op { + case common.EQUALS: + if pred.Field == common.SPAN_ID { + // This is an annoying corner case. There can only be one + // result each time we do an EQUALS search for a span id. + // Span id is the primary key for all our spans. + // But for some reason someone is asking for another result. + // We modify the query to search for the illegal 0 span ID, + // which will never be present. + if lg.DebugEnabled() { + lg.Debugf("Attempted to use a continuation token with an EQUALS "+ + "SPAN_ID query. %s. Setting search id = 0", + pred.Predicate.String()) + } + startId = common.INVALID_SPAN_ID + } else { + // When doing an EQUALS search on a secondary index, the + // results are sorted by span id. + startId = prev.Id.Next() + } + case common.LESS_THAN_OR_EQUALS: + // Subtract one from the previous span id. Since the previous + // start ID will never be 0 (0 is an illegal span id), we'll never + // wrap around when doing this. + startId = prev.Id.Prev() + case common.GREATER_THAN_OR_EQUALS: + // We can't add one to the span id, since the previous span ID + // might be the maximum value. So just switch over to using + // GREATER_THAN. + pred.Op = common.GREATER_THAN + startId = prev.Id + case common.GREATER_THAN: + // This one is easy. + startId = prev.Id + default: + str := fmt.Sprintf("Can't use a %v predicate as a source.", pred.Predicate.String()) + lg.Error(str + "\n") + panic(str) + } + if pred.Field == common.SPAN_ID { + pred.key = startId.Val() + searchKey = append([]byte{src.keyPrefix}, startId.Val()...) + } else { + // Start where the previous query left off. This means adjusting + // our uintKey. + pred.key = pred.extractRelevantSpanData(prev) + searchKey = append(append([]byte{src.keyPrefix}, pred.key...), + startId.Val()...) + } + if lg.TraceEnabled() { + lg.Tracef("Handling continuation token %s for %s. startId=%d, "+ + "pred.uintKey=%s\n", prev, pred.Predicate.String(), startId, + hex.EncodeToString(pred.key)) + } + } else { + searchKey = append([]byte{src.keyPrefix}, pred.key...) + } + for i := range src.iters { + src.iters[i].Seek(searchKey) + } + ret = &src + return ret, nil +} + +// A source of spans. +type source struct { + store *dataStore + pred *predicateData + shards []*shard + iters []*levigo.Iterator + nexts []*common.Span + numRead []int + keyPrefix byte +} + +func CreateReaperSource(shd *shard) (*source, error) { + store := shd.store + p := &common.Predicate{ + Op: common.GREATER_THAN_OR_EQUALS, + Field: common.BEGIN_TIME, + Val: common.INVALID_SPAN_ID.String(), + } + pred, err := loadPredicateData(p) + if err != nil { + return nil, err + } + src := &source{ + store: store, + pred: pred, + shards: []*shard{shd}, + iters: make([]*levigo.Iterator, 1), + nexts: make([]*common.Span, 1), + numRead: make([]int, 1), + keyPrefix: pred.getIndexPrefix(), + } + iter := shd.ldb.NewIterator(store.readOpts) + src.iters[0] = iter + searchKey := append(append([]byte{src.keyPrefix}, pred.key...), + pred.key...) + iter.Seek(searchKey) + return src, nil +} + +// Fill in the entry in the 'next' array for a specific shard. +func (src *source) populateNextFromShard(shardIdx int) { + lg := src.store.lg + var err error + iter := src.iters[shardIdx] + shdPath := src.shards[shardIdx].path + if iter == nil { + lg.Debugf("Can't populate: No more entries in shard %s\n", shdPath) + return // There are no more entries in this shard. + } + if src.nexts[shardIdx] != nil { + lg.Debugf("No need to populate shard %s\n", shdPath) + return // We already have a valid entry for this shard. + } + for { + if !iter.Valid() { + lg.Debugf("Can't populate: Iterator for shard %s is no longer valid.\n", shdPath) + break // Can't read past end of DB + } + src.numRead[shardIdx]++ + key := iter.Key() + if len(key) < 1 { + lg.Warnf("Encountered invalid zero-byte key in shard %s.\n", shdPath) + break + } + ret := src.checkKeyPrefix(key[0], iter) + if ret == NOT_SATISFIED { + break // Can't read past end of indexed section + } else if ret == NOT_YET_SATISFIED { + if src.pred.Op.IsDescending() { + iter.Prev() + } else { + iter.Next() + } + continue // Try again because we are not yet at the indexed section. + } + var span *common.Span + var sid common.SpanId + if src.keyPrefix == SPAN_ID_INDEX_PREFIX { + // The span id maps to the span itself. + sid = common.SpanId(key[1:17]) + span, err = src.shards[shardIdx].decodeSpan(sid, iter.Value()) + if err != nil { + if lg.DebugEnabled() { + lg.Debugf("Internal error decoding span %s in shard %s: %s\n", + sid.String(), shdPath, err.Error()) + } + break + } + } else { + // With a secondary index, we have to look up the span by id. + sid = common.SpanId(key[9:25]) + span = src.shards[shardIdx].FindSpan(sid) + if span == nil { + if lg.DebugEnabled() { + lg.Debugf("Internal error rehydrating span %s in shard %s\n", + sid.String(), shdPath) + } + break + } + } + if src.pred.Op.IsDescending() { + iter.Prev() + } else { + iter.Next() + } + ret = src.pred.satisfiedBy(span) + if ret == SATISFIED { + if lg.DebugEnabled() { + lg.Debugf("Populated valid span %v from shard %s.\n", sid, shdPath) + } + src.nexts[shardIdx] = span // Found valid entry + return + } + if ret == NOT_SATISFIED { + // This and subsequent entries don't satisfy predicate + break + } + } + lg.Debugf("Closing iterator for shard %s.\n", shdPath) + iter.Close() + src.iters[shardIdx] = nil +} + +// Check the key prefix against the key prefix of the query. +func (src *source) checkKeyPrefix(kp byte, iter *levigo.Iterator) satisfiedByReturn { + if kp == src.keyPrefix { + return SATISFIED + } else if kp < src.keyPrefix { + if src.pred.Op.IsDescending() { + return NOT_SATISFIED + } else { + return NOT_YET_SATISFIED + } + } else { + if src.pred.Op.IsDescending() { + return NOT_YET_SATISFIED + } else { + return NOT_SATISFIED + } + } +} + +func (src *source) next() *common.Span { + for shardIdx := range src.shards { + src.populateNextFromShard(shardIdx) + } + var best *common.Span + bestIdx := -1 + for shardIdx := range src.iters { + span := src.nexts[shardIdx] + if src.pred.spanPtrIsBefore(span, best) { + best = span + bestIdx = shardIdx + } + } + if bestIdx >= 0 { + src.nexts[bestIdx] = nil + } + return best +} + +func (src *source) Close() { + for i := range src.iters { + if src.iters[i] != nil { + src.iters[i].Close() + } + } + src.iters = nil +} + +func (src *source) getStats() string { + ret := fmt.Sprintf("Source stats: pred = %s", src.pred.String()) + prefix := ". " + for shardIdx := range src.shards { + next := fmt.Sprintf("%sRead %d spans from %s", prefix, + src.numRead[shardIdx], src.shards[shardIdx].path) + prefix = ", " + ret = ret + next + } + return ret +} + +func (store *dataStore) obtainSource(preds *[]*predicateData, span *common.Span) (*source, error) { + // Read spans from the first predicate that is indexed. + p := *preds + for i := range p { + pred := p[i] + if pred.getIndexPrefix() != INVALID_INDEX_PREFIX { + *preds = append(p[0:i], p[i+1:]...) + return pred.createSource(store, span) + } + } + // If there are no predicates that are indexed, read rows in order of span id. + spanIdPred := common.Predicate{Op: common.GREATER_THAN_OR_EQUALS, + Field: common.SPAN_ID, + Val: common.INVALID_SPAN_ID.String(), + } + spanIdPredData, err := loadPredicateData(&spanIdPred) + if err != nil { + return nil, err + } + return spanIdPredData.createSource(store, span) +} + +func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error, []int) { + lg := store.lg + // Parse predicate data. + var err error + preds := make([]*predicateData, len(query.Predicates)) + for i := range query.Predicates { + preds[i], err = loadPredicateData(&query.Predicates[i]) + if err != nil { + return nil, err, nil + } + } + // Get a source of rows. + var src *source + src, err = store.obtainSource(&preds, query.Prev) + if err != nil { + return nil, err, nil + } + defer src.Close() + if lg.DebugEnabled() { + lg.Debugf("HandleQuery %s: preds = %s, src = %v\n", query, preds, src) + } + + // Filter the spans through the remaining predicates. + reserved := 32 + if query.Lim < reserved { + reserved = query.Lim + } + ret := make([]*common.Span, 0, reserved) + for { + if len(ret) >= query.Lim { + if lg.DebugEnabled() { + lg.Debugf("HandleQuery %s: hit query limit after obtaining "+ + "%d results. %s\n.", query, query.Lim, src.getStats()) + } + break // we hit the result size limit + } + span := src.next() + if span == nil { + if lg.DebugEnabled() { + lg.Debugf("HandleQuery %s: found %d result(s), which are "+ + "all that exist. %s\n", query, len(ret), src.getStats()) + } + break // the source has no more spans to give + } + if lg.DebugEnabled() { + lg.Debugf("src.next returned span %s\n", span.ToJson()) + } + satisfied := true + for predIdx := range preds { + if preds[predIdx].satisfiedBy(span) != SATISFIED { + satisfied = false + break + } + } + if satisfied { + ret = append(ret, span) + } + } + return ret, nil, src.numRead +} + +func (store *dataStore) ServerStats() *common.ServerStats { + serverStats := common.ServerStats{ + Dirs: make([]common.StorageDirectoryStats, len(store.shards)), + } + for shardIdx := range store.shards { + shard := store.shards[shardIdx] + serverStats.Dirs[shardIdx].Path = shard.path + r := levigo.Range{ + Start: []byte{0}, + Limit: []byte{0xff}, + } + vals := shard.ldb.GetApproximateSizes([]levigo.Range{r}) + serverStats.Dirs[shardIdx].ApproximateBytes = vals[0] + serverStats.Dirs[shardIdx].LevelDbStats = + shard.ldb.PropertyValue("leveldb.stats") + store.msink.lg.Debugf("levedb.stats for %s: %s\n", + shard.path, shard.ldb.PropertyValue("leveldb.stats")) + } + serverStats.LastStartMs = store.startMs + serverStats.CurMs = common.TimeToUnixMs(time.Now().UTC()) + serverStats.ReapedSpans = atomic.LoadUint64(&store.rpr.ReapedSpans) + store.msink.PopulateServerStats(&serverStats) + return &serverStats +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/datastore_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/htrace/htraced/datastore_test.go new file mode 100644 index 0000000..a7ecead --- /dev/null +++ b/htrace-htraced/go/src/htrace/htraced/datastore_test.go @@ -0,0 +1,761 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bytes" + "encoding/json" + htrace "htrace/client" + "htrace/common" + "htrace/conf" + "htrace/test" + "math/rand" + "os" + "reflect" + "sort" + "testing" + "time" +) + +// Test creating and tearing down a datastore. +func TestCreateDatastore(t *testing.T) { + htraceBld := &MiniHTracedBuilder{Name: "TestCreateDatastore", + DataDirs: make([]string, 3)} + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() +} + +var SIMPLE_TEST_SPANS []common.Span = []common.Span{ + common.Span{Id: common.TestId("00000000000000000000000000000001"), + SpanData: common.SpanData{ + Begin: 123, + End: 456, + Description: "getFileDescriptors", + Parents: []common.SpanId{}, + TracerId: "firstd", + }}, + common.Span{Id: common.TestId("00000000000000000000000000000002"), + SpanData: common.SpanData{ + Begin: 125, + End: 200, + Description: "openFd", + Parents: []common.SpanId{common.TestId("00000000000000000000000000000001")}, + TracerId: "secondd", + }}, + common.Span{Id: common.TestId("00000000000000000000000000000003"), + SpanData: common.SpanData{ + Begin: 200, + End: 456, + Description: "passFd", + Parents: []common.SpanId{common.TestId("00000000000000000000000000000001")}, + TracerId: "thirdd", + }}, +} + +func createSpans(spans []common.Span, store *dataStore) { + ing := store.NewSpanIngestor(store.lg, "127.0.0.1", "") + for idx := range spans { + ing.IngestSpan(&spans[idx]) + } + ing.Close(time.Now()) + store.WrittenSpans.Waits(int64(len(spans))) +} + +// Test creating a datastore and adding some spans. +func TestDatastoreWriteAndRead(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestDatastoreWriteAndRead", + Cnf: map[string]string{ + conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", + }, + WrittenSpans: common.NewSemaphore(0), + } + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(SIMPLE_TEST_SPANS, ht.Store) + + span := ht.Store.FindSpan(common.TestId("00000000000000000000000000000001")) + if span == nil { + t.Fatal() + } + if !span.Id.Equal(common.TestId("00000000000000000000000000000001")) { + t.Fatal() + } + common.ExpectSpansEqual(t, &SIMPLE_TEST_SPANS[0], span) + children := ht.Store.FindChildren(common.TestId("00000000000000000000000000000001"), 1) + if len(children) != 1 { + t.Fatalf("expected 1 child, but got %d\n", len(children)) + } + children = ht.Store.FindChildren(common.TestId("00000000000000000000000000000001"), 2) + if len(children) != 2 { + t.Fatalf("expected 2 children, but got %d\n", len(children)) + } + sort.Sort(common.SpanIdSlice(children)) + if !children[0].Equal(common.TestId("00000000000000000000000000000002")) { + t.Fatal() + } + if !children[1].Equal(common.TestId("00000000000000000000000000000003")) { + t.Fatal() + } +} + +func testQuery(t *testing.T, ht *MiniHTraced, query *common.Query, + expectedSpans []common.Span) { + testQueryExt(t, ht, query, expectedSpans, nil) +} + +func testQueryExt(t *testing.T, ht *MiniHTraced, query *common.Query, + expectedSpans []common.Span, expectedNumScanned []int) { + spans, err, numScanned := ht.Store.HandleQuery(query) + if err != nil { + t.Fatalf("Query %s failed: %s\n", query.String(), err.Error()) + } + expectedBuf := new(bytes.Buffer) + dec := json.NewEncoder(expectedBuf) + err = dec.Encode(expectedSpans) + if err != nil { + t.Fatalf("Failed to encode expectedSpans to JSON: %s\n", err.Error()) + } + spansBuf := new(bytes.Buffer) + dec = json.NewEncoder(spansBuf) + err = dec.Encode(spans) + if err != nil { + t.Fatalf("Failed to encode result spans to JSON: %s\n", err.Error()) + } + t.Logf("len(spans) = %d, len(expectedSpans) = %d\n", len(spans), + len(expectedSpans)) + common.ExpectStrEqual(t, string(expectedBuf.Bytes()), string(spansBuf.Bytes())) + if expectedNumScanned != nil { + if !reflect.DeepEqual(expectedNumScanned, numScanned) { + t.Fatalf("Invalid values for numScanned: got %v, expected %v\n", + expectedNumScanned, numScanned) + } + } +} + +// Test queries on the datastore. +func TestSimpleQuery(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestSimpleQuery", + Cnf: map[string]string{ + conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", + }, + WrittenSpans: common.NewSemaphore(0), + } + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(SIMPLE_TEST_SPANS, ht.Store) + + assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS)) + + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.GREATER_THAN_OR_EQUALS, + Field: common.BEGIN_TIME, + Val: "125", + }, + }, + Lim: 5, + }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]}) +} + +func TestQueries2(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestQueries2", + Cnf: map[string]string{ + conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", + }, + WrittenSpans: common.NewSemaphore(0), + } + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(SIMPLE_TEST_SPANS, ht.Store) + assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS)) + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.LESS_THAN_OR_EQUALS, + Field: common.BEGIN_TIME, + Val: "125", + }, + }, + Lim: 5, + }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]}) + + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.LESS_THAN_OR_EQUALS, + Field: common.BEGIN_TIME, + Val: "125", + }, + common.Predicate{ + Op: common.EQUALS, + Field: common.DESCRIPTION, + Val: "getFileDescriptors", + }, + }, + Lim: 2, + }, []common.Span{SIMPLE_TEST_SPANS[0]}) + + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.EQUALS, + Field: common.DESCRIPTION, + Val: "getFileDescriptors", + }, + }, + Lim: 2, + }, []common.Span{SIMPLE_TEST_SPANS[0]}) +} + +func TestQueries3(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestQueries3", + Cnf: map[string]string{ + conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", + }, + WrittenSpans: common.NewSemaphore(0), + } + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(SIMPLE_TEST_SPANS, ht.Store) + assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS)) + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.CONTAINS, + Field: common.DESCRIPTION, + Val: "Fd", + }, + common.Predicate{ + Op: common.GREATER_THAN_OR_EQUALS, + Field: common.BEGIN_TIME, + Val: "100", + }, + }, + Lim: 5, + }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]}) + + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.LESS_THAN_OR_EQUALS, + Field: common.SPAN_ID, + Val: common.TestId("00000000000000000000000000000000").String(), + }, + }, + Lim: 200, + }, []common.Span{}) + + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.LESS_THAN_OR_EQUALS, + Field: common.SPAN_ID, + Val: common.TestId("00000000000000000000000000000002").String(), + }, + }, + Lim: 200, + }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[0]}) +} + +func TestQueries4(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestQueries4", + Cnf: map[string]string{ + conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", + }, + WrittenSpans: common.NewSemaphore(0), + } + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(SIMPLE_TEST_SPANS, ht.Store) + + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.GREATER_THAN, + Field: common.BEGIN_TIME, + Val: "125", + }, + }, + Lim: 5, + }, []common.Span{SIMPLE_TEST_SPANS[2]}) + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.GREATER_THAN_OR_EQUALS, + Field: common.DESCRIPTION, + Val: "openFd", + }, + }, + Lim: 2, + }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]}) + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.GREATER_THAN, + Field: common.DESCRIPTION, + Val: "openFd", + }, + }, + Lim: 2, + }, []common.Span{SIMPLE_TEST_SPANS[2]}) +} + +var TEST_QUERIES5_SPANS []common.Span = []common.Span{ + common.Span{Id: common.TestId("10000000000000000000000000000001"), + SpanData: common.SpanData{ + Begin: 123, + End: 456, + Description: "span1", + Parents: []common.SpanId{}, + TracerId: "myTracer", + }}, + common.Span{Id: common.TestId("10000000000000000000000000000002"), + SpanData: common.SpanData{ + Begin: 123, + End: 200, + Description: "span2", + Parents: []common.SpanId{common.TestId("10000000000000000000000000000001")}, + TracerId: "myTracer", + }}, + common.Span{Id: common.TestId("10000000000000000000000000000003"), + SpanData: common.SpanData{ + Begin: 124, + End: 457, + Description: "span3", + Parents: []common.SpanId{common.TestId("10000000000000000000000000000001")}, + TracerId: "myTracer", + }}, +} + +func TestQueries5(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestQueries5", + WrittenSpans: common.NewSemaphore(0), + DataDirs: make([]string, 1), + } + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(TEST_QUERIES5_SPANS, ht.Store) + + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.GREATER_THAN, + Field: common.BEGIN_TIME, + Val: "123", + }, + }, + Lim: 5, + }, []common.Span{TEST_QUERIES5_SPANS[2]}) + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.GREATER_THAN, + Field: common.END_TIME, + Val: "200", + }, + }, + Lim: 500, + }, []common.Span{TEST_QUERIES5_SPANS[0], TEST_QUERIES5_SPANS[2]}) + + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.LESS_THAN_OR_EQUALS, + Field: common.END_TIME, + Val: "999", + }, + }, + Lim: 500, + }, []common.Span{TEST_QUERIES5_SPANS[2], + TEST_QUERIES5_SPANS[0], + TEST_QUERIES5_SPANS[1], + }) +} + +func BenchmarkDatastoreWrites(b *testing.B) { + htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites", + Cnf: map[string]string{ + conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", + conf.HTRACE_LOG_LEVEL: "INFO", + }, + WrittenSpans: common.NewSemaphore(0), + } + ht, err := htraceBld.Build() + if err != nil { + b.Fatalf("Error creating MiniHTraced: %s\n", err.Error()) + } + ht.Store.lg.Infof("BenchmarkDatastoreWrites: b.N = %d\n", b.N) + defer func() { + if r := recover(); r != nil { + ht.Store.lg.Infof("panic: %s\n", r.(error)) + } + ht.Close() + }() + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + allSpans := make([]*common.Span, b.N) + for n := range allSpans { + allSpans[n] = test.NewRandomSpan(rnd, allSpans[0:n]) + } + + // Reset the timer to avoid including the time required to create new + // random spans in the benchmark total. + b.ResetTimer() + + // Write many random spans. + ing := ht.Store.NewSpanIngestor(ht.Store.lg, "127.0.0.1", "") + for n := 0; n < b.N; n++ { + ing.IngestSpan(allSpans[n]) + } + ing.Close(time.Now()) + // Wait for all the spans to be written. + ht.Store.WrittenSpans.Waits(int64(b.N)) + assertNumWrittenEquals(b, ht.Store.msink, b.N) +} + +func verifySuccessfulLoad(t *testing.T, allSpans common.SpanSlice, + dataDirs []string) { + htraceBld := &MiniHTracedBuilder{ + Name: "TestReloadDataStore#verifySuccessfulLoad", + DataDirs: dataDirs, + KeepDataDirsOnClose: true, + } + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf(), nil) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + defer hcl.Close() + for i := 0; i < len(allSpans); i++ { + span, err := hcl.FindSpan(allSpans[i].Id) + if err != nil { + t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) + } + common.ExpectSpansEqual(t, allSpans[i], span) + } + // Look up the spans we wrote. + var span *common.Span + for i := 0; i < len(allSpans); i++ { + span, err = hcl.FindSpan(allSpans[i].Id) + if err != nil { + t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) + } + common.ExpectSpansEqual(t, allSpans[i], span) + } +} + +func verifyFailedLoad(t *testing.T, dataDirs []string, expectedErr string) { + htraceBld := &MiniHTracedBuilder{ + Name: "TestReloadDataStore#verifyFailedLoad", + DataDirs: dataDirs, + KeepDataDirsOnClose: true, + } + _, err := htraceBld.Build() + if err == nil { + t.Fatalf("expected failure to load, but the load succeeded.") + } + common.AssertErrContains(t, err, expectedErr) +} + +func TestReloadDataStore(t *testing.T) { + htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore", + Cnf: map[string]string{ + conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", + }, + DataDirs: make([]string, 2), + KeepDataDirsOnClose: true, + WrittenSpans: common.NewSemaphore(0), + } + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + dataDirs := make([]string, len(ht.DataDirs)) + copy(dataDirs, ht.DataDirs) + defer func() { + if ht != nil { + ht.Close() + } + for i := range dataDirs { + os.RemoveAll(dataDirs[i]) + } + }() + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf(), nil) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + hcnf := ht.Cnf.Clone() + + // Create some random trace spans. + NUM_TEST_SPANS := 5 + allSpans := createRandomTestSpans(NUM_TEST_SPANS) + err = hcl.WriteSpans(allSpans) + if err != nil { + t.Fatalf("WriteSpans failed: %s\n", err.Error()) + } + ht.Store.WrittenSpans.Waits(int64(NUM_TEST_SPANS)) + + // Look up the spans we wrote. + var span *common.Span + for i := 0; i < NUM_TEST_SPANS; i++ { + span, err = hcl.FindSpan(allSpans[i].Id) + if err != nil { + t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) + } + common.ExpectSpansEqual(t, allSpans[i], span) + } + hcl.Close() + ht.Close() + ht = nil + + // Verify that we can reload the datastore, even if we configure the data + // directories in a different order. + verifySuccessfulLoad(t, allSpans, []string{dataDirs[1], dataDirs[0]}) + + // If we try to reload the datastore with only one directory, it won't work + // (we need both). + verifyFailedLoad(t, []string{dataDirs[1]}, + "The TotalShards field of all shards is 2, but we have 1 shards.") + + // Test that we give an intelligent error message when 0 directories are + // configured. + verifyFailedLoad(t, []string{}, "No shard directories found.") + + // Can't specify the same directory more than once... will get "lock + // already held by process" + verifyFailedLoad(t, []string{dataDirs[0], dataDirs[1], dataDirs[1]}, + " already held by process.") + + // Open the datastore and modify it to have the wrong DaemonId + dld := NewDataStoreLoader(hcnf) + defer func() { + if dld != nil { + dld.Close() + dld = nil + } + }() + dld.LoadShards() + sinfo, err := dld.shards[0].readShardInfo() + if err != nil { + t.Fatalf("error reading shard info for shard %s: %s\n", + dld.shards[0].path, err.Error()) + } + newDaemonId := sinfo.DaemonId + 1 + dld.lg.Infof("Read %s from shard %s. Changing daemonId to 0x%016x\n.", + asJson(sinfo), dld.shards[0].path, newDaemonId) + sinfo.DaemonId = newDaemonId + err = dld.shards[0].writeShardInfo(sinfo) + if err != nil { + t.Fatalf("error writing shard info for shard %s: %s\n", + dld.shards[0].path, err.Error()) + } + dld.Close() + dld = nil + verifyFailedLoad(t, dataDirs, "DaemonId mismatch.") + + // Open the datastore and modify it to have the wrong TotalShards + dld = NewDataStoreLoader(hcnf) + dld.LoadShards() + sinfo, err = dld.shards[0].readShardInfo() + if err != nil { + t.Fatalf("error reading shard info for shard %s: %s\n", + dld.shards[0].path, err.Error()) + } + newDaemonId = sinfo.DaemonId - 1 + dld.lg.Infof("Read %s from shard %s. Changing daemonId to 0x%016x, "+ + "TotalShards to 3\n.", + asJson(sinfo), dld.shards[0].path, newDaemonId) + sinfo.DaemonId = newDaemonId + sinfo.TotalShards = 3 + err = dld.shards[0].writeShardInfo(sinfo) + if err != nil { + t.Fatalf("error writing shard info for shard %s: %s\n", + dld.shards[0].path, err.Error()) + } + dld.Close() + dld = nil + verifyFailedLoad(t, dataDirs, "TotalShards mismatch.") + + // Open the datastore and modify it to have the wrong LayoutVersion + dld = NewDataStoreLoader(hcnf) + dld.LoadShards() + for shardIdx := range dld.shards { + sinfo, err = dld.shards[shardIdx].readShardInfo() + if err != nil { + t.Fatalf("error reading shard info for shard %s: %s\n", + dld.shards[shardIdx].path, err.Error()) + } + dld.lg.Infof("Read %s from shard %s. Changing TotalShards to 2, "+ + "LayoutVersion to 2\n", asJson(sinfo), dld.shards[shardIdx].path) + sinfo.TotalShards = 2 + sinfo.LayoutVersion = 2 + err = dld.shards[shardIdx].writeShardInfo(sinfo) + if err != nil { + t.Fatalf("error writing shard info for shard %s: %s\n", + dld.shards[0].path, err.Error()) + } + } + dld.Close() + dld = nil + verifyFailedLoad(t, dataDirs, "The layout version of all shards is 2, "+ + "but we only support") + + // It should work with data.store.clear set. + htraceBld = &MiniHTracedBuilder{ + Name: "TestReloadDataStore#clear", + DataDirs: dataDirs, + KeepDataDirsOnClose: true, + Cnf: map[string]string{conf.HTRACE_DATA_STORE_CLEAR: "true"}, + } + ht, err = htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } +} + +func TestQueriesWithContinuationTokens1(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestQueriesWithContinuationTokens1", + Cnf: map[string]string{ + conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "30000", + }, + WrittenSpans: common.NewSemaphore(0), + } + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(SIMPLE_TEST_SPANS, ht.Store) + assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS)) + // Adding a prev value to this query excludes the first result that we + // would normally get. + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.GREATER_THAN, + Field: common.BEGIN_TIME, + Val: "120", + }, + }, + Lim: 5, + Prev: &SIMPLE_TEST_SPANS[0], + }, []common.Span{SIMPLE_TEST_SPANS[1], SIMPLE_TEST_SPANS[2]}) + + // There is only one result from an EQUALS query on SPAN_ID. + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.EQUALS, + Field: common.SPAN_ID, + Val: common.TestId("00000000000000000000000000000001").String(), + }, + }, + Lim: 100, + Prev: &SIMPLE_TEST_SPANS[0], + }, []common.Span{}) + + // When doing a LESS_THAN_OR_EQUALS search, we still don't get back the + // span we pass as a continuation token. (Primary index edition). + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.LESS_THAN_OR_EQUALS, + Field: common.SPAN_ID, + Val: common.TestId("00000000000000000000000000000002").String(), + }, + }, + Lim: 100, + Prev: &SIMPLE_TEST_SPANS[1], + }, []common.Span{SIMPLE_TEST_SPANS[0]}) + + // When doing a GREATER_THAN_OR_EQUALS search, we still don't get back the + // span we pass as a continuation token. (Secondary index edition). + testQuery(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.GREATER_THAN, + Field: common.DURATION, + Val: "0", + }, + }, + Lim: 100, + Prev: &SIMPLE_TEST_SPANS[1], + }, []common.Span{SIMPLE_TEST_SPANS[2], SIMPLE_TEST_SPANS[0]}) +} + +func TestQueryRowsScanned(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestQueryRowsScanned", + WrittenSpans: common.NewSemaphore(0), + } + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(SIMPLE_TEST_SPANS, ht.Store) + assertNumWrittenEquals(t, ht.Store.msink, len(SIMPLE_TEST_SPANS)) + testQueryExt(t, ht, &common.Query{ + Predicates: []common.Predicate{ + common.Predicate{ + Op: common.EQUALS, + Field: common.SPAN_ID, + Val: common.TestId("00000000000000000000000000000001").String(), + }, + }, + Lim: 100, + Prev: nil, + }, []common.Span{SIMPLE_TEST_SPANS[0]}, + []int{2, 1}) +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/heartbeater.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/htraced/heartbeater.go b/htrace-htraced/go/src/htrace/htraced/heartbeater.go new file mode 100644 index 0000000..3f4c951 --- /dev/null +++ b/htrace-htraced/go/src/htrace/htraced/heartbeater.go @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "htrace/common" + "sync" + "time" +) + +type Heartbeater struct { + // The name of this heartbeater + name string + + // How long to sleep between heartbeats, in milliseconds. + periodMs int64 + + // The logger to use. + lg *common.Logger + + // The channels to send the heartbeat on. + targets []HeartbeatTarget + + // Incoming requests to the heartbeater. When this is closed, the + // heartbeater will exit. + req chan *HeartbeatTarget + + wg sync.WaitGroup +} + +type HeartbeatTarget struct { + // The name of the heartbeat target. + name string + + // The channel for the heartbeat target. + targetChan chan interface{} +} + +func (tgt *HeartbeatTarget) String() string { + return tgt.name +} + +func NewHeartbeater(name string, periodMs int64, lg *common.Logger) *Heartbeater { + hb := &Heartbeater{ + name: name, + periodMs: periodMs, + lg: lg, + targets: make([]HeartbeatTarget, 0, 4), + req: make(chan *HeartbeatTarget), + } + hb.wg.Add(1) + go hb.run() + return hb +} + +func (hb *Heartbeater) AddHeartbeatTarget(tgt *HeartbeatTarget) { + hb.req <- tgt +} + +func (hb *Heartbeater) Shutdown() { + close(hb.req) + hb.wg.Wait() +} + +func (hb *Heartbeater) String() string { + return hb.name +} + +func (hb *Heartbeater) run() { + defer func() { + hb.lg.Debugf("%s: exiting.\n", hb.String()) + hb.wg.Done() + }() + period := time.Duration(hb.periodMs) * time.Millisecond + for { + periodEnd := time.Now().Add(period) + for { + timeToWait := periodEnd.Sub(time.Now()) + if timeToWait <= 0 { + break + } else if timeToWait > period { + // Smooth over jitter or clock changes + timeToWait = period + periodEnd = time.Now().Add(period) + } + select { + case tgt, open := <-hb.req: + if !open { + return + } + hb.targets = append(hb.targets, *tgt) + hb.lg.Debugf("%s: added %s.\n", hb.String(), tgt.String()) + case <-time.After(timeToWait): + } + } + for targetIdx := range hb.targets { + select { + case hb.targets[targetIdx].targetChan <- nil: + default: + // We failed to send a heartbeat because the other goroutine was busy and + // hasn't cleared the previous one from its channel. This could indicate a + // stuck goroutine. + hb.lg.Infof("%s: could not send heartbeat to %s.\n", + hb.String(), hb.targets[targetIdx]) + } + } + } +}
