Repository: incubator-htrace Updated Branches: refs/heads/master db4394d83 -> 3377f0468
HTRACE-116. Fix htraced's data.store.clear option. (cmccabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/3377f046 Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/3377f046 Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/3377f046 Branch: refs/heads/master Commit: 3377f046897cb1eea32b9fdbff1239bf7af52530 Parents: db4394d Author: Colin P. Mccabe <[email protected]> Authored: Wed Feb 25 15:28:13 2015 -0800 Committer: Colin P. Mccabe <[email protected]> Committed: Fri Feb 27 15:04:19 2015 -0800 ---------------------------------------------------------------------- .../org/apache/htrace/htraced/client_test.go | 9 +- .../src/org/apache/htrace/htraced/datastore.go | 163 ++++++++++++------- .../org/apache/htrace/htraced/datastore_test.go | 102 +++++++++++- .../org/apache/htrace/htraced/mini_htraced.go | 88 ++++++---- 4 files changed, 266 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/3377f046/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go b/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go index 07e7a2c..f9e66ce 100644 --- a/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go @@ -31,7 +31,8 @@ import ( ) func TestClientGetServerInfo(t *testing.T) { - htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerInfo", NumDataDirs: 1} + htraceBld := &MiniHTracedBuilder{Name: "TestClientGetServerInfo", + DataDirs: make([]string, 2)} ht, err := htraceBld.Build() if err != nil { t.Fatalf("failed to create datastore: %s", err.Error()) @@ -60,7 +61,8 @@ func createRandomTestSpans(amount int) common.SpanSlice { } func TestClientOperations(t *testing.T) { - htraceBld := &MiniHTracedBuilder{Name: "TestClientOperations", NumDataDirs: 2} + htraceBld := &MiniHTracedBuilder{Name: "TestClientOperations", + DataDirs: make([]string, 2)} ht, err := htraceBld.Build() if err != nil { t.Fatalf("failed to create datastore: %s", err.Error()) @@ -147,7 +149,8 @@ func TestClientOperations(t *testing.T) { } func TestDumpAll(t *testing.T) { - htraceBld := &MiniHTracedBuilder{Name: "TestDumpAll", NumDataDirs: 2} + htraceBld := &MiniHTracedBuilder{Name: "TestDumpAll", + DataDirs: make([]string, 2)} ht, err := htraceBld.Build() if err != nil { t.Fatalf("failed to create datastore: %s", err.Error()) http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/3377f046/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go index f7c8ece..faf23cd 100644 --- a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go @@ -31,7 +31,6 @@ import ( "strconv" "strings" "sync/atomic" - "syscall" ) // @@ -47,7 +46,7 @@ import ( // for serialization. We assume that there will be many more writes than reads. // // Schema -// m -> dataStoreMetadata +// m -> dataStoreVersion // s[8-byte-big-endian-sid] -> SpanData // b[8-byte-big-endian-begin-time][8-byte-big-endian-child-sid] -> {} // e[8-byte-big-endian-end-time][8-byte-big-endian-child-sid] -> {} @@ -63,10 +62,12 @@ import ( // the signed fields. // -const DATA_STORE_VERSION = 1 +const UNKNOWN_LAYOUT_VERSION = 0 +const CURRENT_LAYOUT_VERSION = 2 var EMPTY_BYTE_BUF []byte = []byte{} +const VERSION_KEY = 'v' const SPAN_ID_INDEX_PREFIX = 's' const BEGIN_TIME_INDEX_PREFIX = 'b' const END_TIME_INDEX_PREFIX = 'e' @@ -158,23 +159,6 @@ type shard struct { exited chan bool } -// Metadata about the DataStore. -type dataStoreMetadata struct { - // The DataStore version. - Version int32 -} - -// Write the metadata key to a shard. -func (shd *shard) WriteMetadata(meta *dataStoreMetadata) error { - w := new(bytes.Buffer) - encoder := gob.NewEncoder(w) - err := encoder.Encode(meta) - if err != nil { - return err - } - return shd.ldb.Put(shd.store.writeOpts, []byte("m"), w.Bytes()) -} - // Process incoming spans for a shard. func (shd *shard) processIncoming() { lg := shd.store.lg @@ -305,10 +289,11 @@ func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataSto dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES) dirs := strings.Split(dirsStr, conf.PATH_LIST_SEP) - // If we return an error, close the store. var err error lg := common.NewLogger("datastore", cnf) store := &dataStore{lg: lg, shards: []*shard{}, WrittenSpans: writtenSpans} + + // If we return an error, close the store. defer func() { if err != nil { store.Close() @@ -324,72 +309,134 @@ func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataSto // Open all shards for idx := range dirs { path := dirs[idx] + conf.PATH_SEP + "db" - err := os.MkdirAll(path, 0777) - if err != nil { - e, ok := err.(*os.PathError) - if !ok || e.Err != syscall.EEXIST { - return nil, err - } - if !clearStored { - // TODO: implement re-opening saved data - lg.Error("Error: path " + path + "already exists.") - return nil, err - } else { - err = os.RemoveAll(path) - if err != nil { - lg.Error("Failed to create " + path + ": " + err.Error()) - return nil, err - } - lg.Info("Cleared " + path) - } - } var shd *shard - shd, err = CreateShard(store, cnf, path) + shd, err = CreateShard(store, cnf, path, clearStored) if err != nil { - lg.Errorf("Error creating shard %s: %s", path, err.Error()) + lg.Errorf("Error creating shard %s: %s\n", path, err.Error()) return nil, err } store.shards = append(store.shards, shd) } - meta := &dataStoreMetadata{Version: DATA_STORE_VERSION} for idx := range store.shards { shd := store.shards[idx] - err := shd.WriteMetadata(meta) - if err != nil { - lg.Error("Failed to write metadata to " + store.shards[idx].path + ": " + err.Error()) - return nil, err - } shd.exited = make(chan bool, 1) go shd.processIncoming() } return store, nil } -func CreateShard(store *dataStore, cnf *conf.Config, path string) (*shard, error) { +func CreateShard(store *dataStore, cnf *conf.Config, path string, + clearStored bool) (*shard, error) { + lg := store.lg + if clearStored { + fi, err := os.Stat(path) + if err != nil && !os.IsNotExist(err) { + lg.Errorf("Failed to stat %s: %s\n", path, err.Error()) + return nil, err + } + if fi != nil { + err = os.RemoveAll(path) + if err != nil { + lg.Errorf("Failed to clear existing datastore directory %s: %s\n", + path, err.Error()) + return nil, err + } + lg.Infof("Cleared existing datastore directory %s\n", path) + } + } + err := os.MkdirAll(path, 0777) + if err != nil { + lg.Errorf("Failed to MkdirAll(%s): %s\n", path, err.Error()) + return nil, err + } var shd *shard - //filter := levigo.NewBloomFilter(10) - //defer filter.Close() openOpts := levigo.NewOptions() defer openOpts.Close() - openOpts.SetCreateIfMissing(true) - //openOpts.SetFilterPolicy(filter) + newlyCreated := false ldb, err := levigo.Open(path, openOpts) - if err != nil { - store.lg.Errorf("LevelDB failed to open %s: %s\n", path, err.Error()) - return nil, err + if err == nil { + store.lg.Infof("LevelDB opened %s\n", path) + } else { + store.lg.Debugf("LevelDB failed to open %s: %s\n", path, err.Error()) + openOpts.SetCreateIfMissing(true) + ldb, err = levigo.Open(path, openOpts) + if err != nil { + store.lg.Errorf("LevelDB failed to create %s: %s\n", path, err.Error()) + return nil, err + } + store.lg.Infof("Created new LevelDB instance in %s\n", path) + newlyCreated = true } defer func() { if shd == nil { ldb.Close() } }() + lv, err := readLayoutVersion(store, ldb) + if err != nil { + store.lg.Errorf("Got error while reading datastore version for %s: %s\n", + path, err.Error()) + return nil, err + } + if newlyCreated && (lv == UNKNOWN_LAYOUT_VERSION) { + err = writeDataStoreVersion(store, ldb, CURRENT_LAYOUT_VERSION) + if err != nil { + store.lg.Errorf("Got error while writing datastore version for %s: %s\n", + path, err.Error()) + return nil, err + } + store.lg.Tracef("Wrote layout version %d to shard at %s.\n", + CURRENT_LAYOUT_VERSION, path) + } else if lv != CURRENT_LAYOUT_VERSION { + versionName := "unknown" + if lv != UNKNOWN_LAYOUT_VERSION { + versionName = fmt.Sprintf("%d", lv) + } + store.lg.Errorf("Can't read old datastore. Its layout version is %s, but this "+ + "software is at layout version %d. Please set %s to clear the datastore "+ + "on startup, or clear it manually.\n", versionName, + CURRENT_LAYOUT_VERSION, conf.HTRACE_DATA_STORE_CLEAR) + return nil, errors.New(fmt.Sprintf("Invalid layout version: got %s, expected %d.", + versionName, CURRENT_LAYOUT_VERSION)) + } else { + store.lg.Tracef("Found layout version %d in %s.\n", lv, path) + } spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE) shd = &shard{store: store, ldb: ldb, path: path, incoming: make(chan *common.Span, spanBufferSize)} - store.lg.Infof("LevelDB opened %s\n", path) return shd, nil } +// Read the datastore version of a leveldb instance. +func readLayoutVersion(store *dataStore, ldb *levigo.DB) (uint32, error) { + buf, err := ldb.Get(store.readOpts, []byte{VERSION_KEY}) + if err != nil { + return 0, err + } + if len(buf) == 0 { + return 0, nil + } + r := bytes.NewBuffer(buf) + decoder := gob.NewDecoder(r) + var v uint32 + err = decoder.Decode(&v) + if err != nil { + return 0, err + } + return v, nil +} + +// Write the datastore version to a shard. +func writeDataStoreVersion(store *dataStore, ldb *levigo.DB, v uint32) error { + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + err := encoder.Encode(&v) + if err != nil { + return err + } + return ldb.Put(store.writeOpts, []byte{VERSION_KEY}, w.Bytes()) +} + func (store *dataStore) GetStatistics() *Statistics { return store.stats.Copy() } @@ -737,7 +784,7 @@ func (src *source) populateNextFromShard(shardIdx int) { src.numRead[shardIdx]++ key := iter.Key() if !bytes.HasPrefix(key, []byte{src.keyPrefix}) { - lg.Debugf("Can't populate: Iterator for shard %d does not have prefix %s", + lg.Debugf("Can't populate: Iterator for shard %d does not have prefix %s\n", shardIdx, string(src.keyPrefix)) break // Can't read past end of indexed section } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/3377f046/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go index fd4bfb1..77497c5 100644 --- a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go @@ -23,15 +23,20 @@ import ( "bytes" "encoding/json" "math/rand" + htrace "org/apache/htrace/client" "org/apache/htrace/common" + "org/apache/htrace/conf" "org/apache/htrace/test" + "os" "sort" + "strings" "testing" ) // Test creating and tearing down a datastore. func TestCreateDatastore(t *testing.T) { - htraceBld := &MiniHTracedBuilder{Name: "TestCreateDatastore", NumDataDirs: 3} + htraceBld := &MiniHTracedBuilder{Name: "TestCreateDatastore", + DataDirs: make([]string, 3)} ht, err := htraceBld.Build() if err != nil { t.Fatalf("failed to create datastore: %s", err.Error()) @@ -342,3 +347,98 @@ func BenchmarkDatastoreWrites(b *testing.B) { b.N, spansWritten) } } + +func TestReloadDataStore(t *testing.T) { + htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore", + DataDirs: make([]string, 2), KeepDataDirsOnClose: true} + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + dataDirs := make([]string, len(ht.DataDirs)) + copy(dataDirs, ht.DataDirs) + defer func() { + if ht != nil { + ht.Close() + } + for i := range dataDirs { + os.RemoveAll(dataDirs[i]) + } + }() + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf()) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + + // Create some random trace spans. + NUM_TEST_SPANS := 5 + allSpans := createRandomTestSpans(NUM_TEST_SPANS) + for i := 0; i < NUM_TEST_SPANS; i++ { + if err := hcl.WriteSpan(allSpans[i]); err != nil { + t.Fatalf("WriteSpan(%d) failed: %s\n", i, err.Error()) + } + } + + // Look up the spans we wrote. + var span *common.Span + for i := 0; i < NUM_TEST_SPANS; i++ { + span, err = hcl.FindSpan(allSpans[i].Id) + if err != nil { + t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) + } + common.ExpectSpansEqual(t, allSpans[i], span) + } + + ht.Close() + ht = nil + + htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore2", + DataDirs: dataDirs, KeepDataDirsOnClose: true} + ht, err = htraceBld.Build() + if err != nil { + t.Fatalf("failed to re-create datastore: %s", err.Error()) + } + hcl, err = htrace.NewClient(ht.ClientConf()) + if err != nil { + t.Fatalf("failed to re-create client: %s", err.Error()) + } + + // Look up the spans we wrote earlier. + for i := 0; i < NUM_TEST_SPANS; i++ { + span, err = hcl.FindSpan(allSpans[i].Id) + if err != nil { + t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) + } + common.ExpectSpansEqual(t, allSpans[i], span) + } + + // Set an old datastore version number. + for i := range ht.Store.shards { + shard := ht.Store.shards[i] + writeDataStoreVersion(ht.Store, shard.ldb, CURRENT_LAYOUT_VERSION-1) + } + ht.Close() + ht = nil + + htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore3", + DataDirs: dataDirs, KeepDataDirsOnClose: true} + ht, err = htraceBld.Build() + if err == nil { + t.Fatalf("expected the datastore to fail to load after setting an " + + "incorrect version.\n") + } + if !strings.Contains(err.Error(), "Invalid layout version") { + t.Fatal(`expected the loading error to contain "invalid layout version"` + "\n") + } + + // It should work with data.store.clear set. + htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore4", + DataDirs: dataDirs, KeepDataDirsOnClose: true, + Cnf: map[string]string{conf.HTRACE_DATA_STORE_CLEAR: "true"}} + ht, err = htraceBld.Build() + if err != nil { + t.Fatalf("expected the datastore loading to succeed after setting an "+ + "incorrect version. But it failed with error %s\n", err.Error()) + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/3377f046/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go b/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go index fd8c2a7..be7e284 100644 --- a/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go @@ -46,20 +46,24 @@ type MiniHTracedBuilder struct { // If ths is nil, we use the default configuration for everything. Cnf map[string]string - // The number of managed data directories to create. - // If this is 0, it defaults to DEFAULT_NUM_DATA_DIRS. - NumDataDirs int + // The DataDirs to use. Empty entries will turn into random names. + DataDirs []string + + // If true, we will keep the data dirs around after MiniHTraced#Close + KeepDataDirsOnClose bool // If non-null, the WrittenSpans channel to use when creating the DataStore. WrittenSpans chan *common.Span } type MiniHTraced struct { - Name string - Cnf *conf.Config - DataDirs []string - Store *dataStore - Rsv *RestServer + Name string + Cnf *conf.Config + DataDirs []string + Store *dataStore + Rsv *RestServer + Lg *common.Logger + KeepDataDirsOnClose bool } func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) { @@ -72,39 +76,45 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) { if bld.Cnf == nil { bld.Cnf = make(map[string]string) } - if bld.NumDataDirs == 0 { - bld.NumDataDirs = DEFAULT_NUM_DATA_DIRS + if bld.DataDirs == nil { + bld.DataDirs = make([]string, 2) + } + for idx := range bld.DataDirs { + if bld.DataDirs[idx] == "" { + bld.DataDirs[idx], err = ioutil.TempDir(os.TempDir(), + fmt.Sprintf("%s%d", bld.Name, idx+1)) + if err != nil { + return nil, err + } + } + } + bld.Cnf[conf.HTRACE_DATA_STORE_DIRECTORIES] = + strings.Join(bld.DataDirs, conf.PATH_LIST_SEP) + bld.Cnf[conf.HTRACE_WEB_ADDRESS] = ":0" // use a random port for the REST server + bld.Cnf[conf.HTRACE_LOG_LEVEL] = "TRACE" + cnfBld := conf.Builder{Values: bld.Cnf, Defaults: conf.DEFAULTS} + cnf, err := cnfBld.Build() + if err != nil { + return nil, err } - dataDirs := make([]string, bld.NumDataDirs) + lg := common.NewLogger("mini.htraced", cnf) defer func() { if err != nil { if store != nil { store.Close() } - for idx := range dataDirs { - if dataDirs[idx] != "" { - os.RemoveAll(dataDirs[idx]) + for idx := range bld.DataDirs { + if bld.DataDirs[idx] != "" { + os.RemoveAll(bld.DataDirs[idx]) } } if rsv != nil { rsv.Close() } + lg.Infof("Failed to create MiniHTraced %s: %s\n", bld.Name, err.Error()) + lg.Close() } }() - for idx := range dataDirs { - dataDirs[idx], err = ioutil.TempDir(os.TempDir(), - fmt.Sprintf("%s%d", bld.Name, idx+1)) - if err != nil { - return nil, err - } - } - bld.Cnf[conf.HTRACE_DATA_STORE_DIRECTORIES] = strings.Join(dataDirs, conf.PATH_LIST_SEP) - bld.Cnf[conf.HTRACE_WEB_ADDRESS] = ":0" // use a random port for the REST server - cnfBld := conf.Builder{Values: bld.Cnf, Defaults: conf.DEFAULTS} - cnf, err := cnfBld.Build() - if err != nil { - return nil, err - } store, err = CreateDataStore(cnf, bld.WrittenSpans) if err != nil { return nil, err @@ -113,11 +123,15 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) { if err != nil { return nil, err } + lg.Infof("Created MiniHTraced %s\n", bld.Name) return &MiniHTraced{ - Cnf: cnf, - DataDirs: dataDirs, - Store: store, - Rsv: rsv, + Name: bld.Name, + Cnf: cnf, + DataDirs: bld.DataDirs, + Store: store, + Rsv: rsv, + Lg: lg, + KeepDataDirsOnClose: bld.KeepDataDirsOnClose, }, nil } @@ -127,9 +141,15 @@ func (ht *MiniHTraced) ClientConf() *conf.Config { } func (ht *MiniHTraced) Close() { + ht.Lg.Infof("Closing MiniHTraced %s\n", ht.Name) ht.Rsv.Close() ht.Store.Close() - for idx := range ht.DataDirs { - os.RemoveAll(ht.DataDirs[idx]) + if !ht.KeepDataDirsOnClose { + for idx := range ht.DataDirs { + ht.Lg.Infof("Removing %s...\n", ht.DataDirs[idx]) + os.RemoveAll(ht.DataDirs[idx]) + } } + ht.Lg.Infof("Finished closing MiniHTraced %s\n", ht.Name) + ht.Lg.Close() }
