Repository: incubator-htrace Updated Branches: refs/heads/master c101f40ed -> b7f9058ca
HTRACE-314. htraced: make datastore loading safer (cmccabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/b7f9058c Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/b7f9058c Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/b7f9058c Branch: refs/heads/master Commit: b7f9058ca9d72974a3133ac2ea6f9c948ce3d539 Parents: c101f40 Author: Colin P. Mccabe <[email protected]> Authored: Tue Dec 1 21:47:58 2015 -0800 Committer: Colin P. Mccabe <[email protected]> Committed: Fri Dec 4 11:59:28 2015 -0800 ---------------------------------------------------------------------- .../src/org/apache/htrace/common/test_util.go | 8 + .../src/org/apache/htrace/htraced/datastore.go | 245 ++------- .../org/apache/htrace/htraced/datastore_test.go | 180 +++++-- .../go/src/org/apache/htrace/htraced/loader.go | 509 +++++++++++++++++++ .../org/apache/htrace/htraced/mini_htraced.go | 6 +- 5 files changed, 692 insertions(+), 256 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/b7f9058c/htrace-htraced/go/src/org/apache/htrace/common/test_util.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/common/test_util.go b/htrace-htraced/go/src/org/apache/htrace/common/test_util.go index ec9151b..a761525 100644 --- a/htrace-htraced/go/src/org/apache/htrace/common/test_util.go +++ b/htrace-htraced/go/src/org/apache/htrace/common/test_util.go @@ -22,6 +22,7 @@ package common import ( "fmt" "testing" + "strings" "time" ) @@ -81,3 +82,10 @@ func TestId(str string) SpanId { } return spanId } + +func AssertErrContains(t *testing.T, err error, str string) { + if !strings.Contains(err.Error(), str) { + t.Fatalf("expected the error to contain %s, but it was %s\n", + str, err.Error()) + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/b7f9058c/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go index 828a6af..596b652 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go @@ -21,21 +21,17 @@ package main import ( "bytes" - "encoding/gob" "encoding/hex" "errors" "fmt" "github.com/jmhodges/levigo" "github.com/ugorji/go/codec" - "math" "org/apache/htrace/common" "org/apache/htrace/conf" - "os" "strconv" "strings" "sync" "sync/atomic" - "syscall" "time" ) @@ -68,13 +64,8 @@ import ( // the signed fields. // -const UNKNOWN_LAYOUT_VERSION = 0 -const CURRENT_LAYOUT_VERSION = 3 - var EMPTY_BYTE_BUF []byte = []byte{} -const VERSION_KEY = 'v' - const SPAN_ID_INDEX_PREFIX = 's' const BEGIN_TIME_INDEX_PREFIX = 'b' const END_TIME_INDEX_PREFIX = 'e' @@ -462,227 +453,49 @@ type dataStore struct { // When this datastore was started (in UTC milliseconds since the epoch) startMs int64 - - // The maximum number of open files to allow per shard. - maxFdPerShard int } func CreateDataStore(cnf *conf.Config, writtenSpans *common.Semaphore) (*dataStore, error) { - // Get the configuration. - clearStored := cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR) - dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES) - dirs := strings.Split(dirsStr, conf.PATH_LIST_SEP) - - var err error - lg := common.NewLogger("datastore", cnf) - store := &dataStore{lg: lg, shards: []*shard{}, WrittenSpans: writtenSpans} - - // If we return an error, close the store. - defer func() { - if err != nil { - store.Close() - store = nil - } - }() - - store.readOpts = levigo.NewReadOptions() - store.readOpts.SetFillCache(true) - store.readOpts.SetVerifyChecksums(false) - store.writeOpts = levigo.NewWriteOptions() - store.writeOpts.SetSync(false) - - // Open all shards - for idx := range dirs { - path := dirs[idx] + conf.PATH_SEP + "db" - var shd *shard - shd, err = CreateShard(store, cnf, path, clearStored) - if err != nil { - lg.Errorf("Error creating shard %s: %s\n", path, err.Error()) - return nil, err - } - store.shards = append(store.shards, shd) + dld := NewDataStoreLoader(cnf) + defer dld.Close() + err := dld.Load() + if err != nil { + dld.lg.Errorf("Error loading datastore: %s\n", err.Error()) + return nil, err } - store.msink = NewMetricsSink(cnf) - store.rpr = NewReaper(cnf) - for idx := range store.shards { - shd := store.shards[idx] - shd.heartbeats = make(chan interface{}, 1) - shd.exited.Add(1) - go shd.processIncoming() + store := &dataStore { + lg: dld.lg, + shards: make([]*shard, len(dld.shards)), + readOpts: dld.readOpts, + writeOpts: dld.writeOpts, + WrittenSpans: writtenSpans, + msink: NewMetricsSink(cnf), + hb: NewHeartbeater("DatastoreHeartbeater", + cnf.GetInt64(conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS), dld.lg), + rpr: NewReaper(cnf), + startMs: common.TimeToUnixMs(time.Now().UTC()), } - store.hb = NewHeartbeater("DatastoreHeartbeater", - cnf.GetInt64(conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS), lg) + spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE) for shdIdx := range store.shards { - shd := store.shards[shdIdx] + shd := &shard { + store: store, + ldb: dld.shards[shdIdx].ldb, + path: dld.shards[shdIdx].path, + incoming: make(chan []*IncomingSpan, spanBufferSize), + heartbeats: make(chan interface{}, 1), + } + shd.exited.Add(1) + go shd.processIncoming() + store.shards[shdIdx] = shd store.hb.AddHeartbeatTarget(&HeartbeatTarget{ name: fmt.Sprintf("shard(%s)", shd.path), targetChan: shd.heartbeats, }) } - store.startMs = common.TimeToUnixMs(time.Now().UTC()) - err = store.calculateMaxOpenFilesPerShard() - if err != nil { - lg.Warnf("Unable to calculate maximum open files per shard: %s\n", - err.Error()) - } + dld.DisownResources() return store, nil } -// The maximum number of file descriptors we'll use on non-datastore things. -const NON_DATASTORE_FD_MAX = 300 - -// The minimum number of file descriptors per shard we will set. Setting fewer -// than this number could trigger a bug in some early versions of leveldb. -const MIN_FDS_PER_SHARD = 80 - -func (store *dataStore) calculateMaxOpenFilesPerShard() error { - var rlim syscall.Rlimit - err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlim) - if err != nil { - return err - } - // I think RLIMIT_NOFILE fits in 32 bits on all known operating systems, - // but there's no harm in being careful. 'int' in golang always holds at - // least 32 bits. - var maxFd int - if rlim.Cur > uint64(math.MaxInt32) { - maxFd = math.MaxInt32 - } else { - maxFd = int(rlim.Cur) - } - fdsPerShard := (maxFd - NON_DATASTORE_FD_MAX) / len(store.shards) - if fdsPerShard < MIN_FDS_PER_SHARD { - return errors.New(fmt.Sprintf("Expected to be able to use at least %d " + - "fds per shard, but we have %d shards and %d total fds to allocate, " + - "giving us only %d FDs per shard.", MIN_FDS_PER_SHARD, - len(store.shards), maxFd - NON_DATASTORE_FD_MAX, fdsPerShard)) - } - store.lg.Infof("maxFd = %d. Setting maxFdPerShard = %d\n", - maxFd, fdsPerShard) - store.maxFdPerShard = fdsPerShard - return nil -} - -func CreateShard(store *dataStore, cnf *conf.Config, path string, - clearStored bool) (*shard, error) { - lg := store.lg - if clearStored { - fi, err := os.Stat(path) - if err != nil && !os.IsNotExist(err) { - lg.Errorf("Failed to stat %s: %s\n", path, err.Error()) - return nil, err - } - if fi != nil { - err = os.RemoveAll(path) - if err != nil { - lg.Errorf("Failed to clear existing datastore directory %s: %s\n", - path, err.Error()) - return nil, err - } - lg.Infof("Cleared existing datastore directory %s\n", path) - } - } - err := os.MkdirAll(path, 0777) - if err != nil { - lg.Errorf("Failed to MkdirAll(%s): %s\n", path, err.Error()) - return nil, err - } - var shd *shard - openOpts := levigo.NewOptions() - openOpts.SetParanoidChecks(false) - writeBufferSize := cnf.GetInt(conf.HTRACE_LEVELDB_WRITE_BUFFER_SIZE) - if writeBufferSize > 0 { - openOpts.SetWriteBufferSize(writeBufferSize) - } - if store.maxFdPerShard > 0 { - openOpts.SetMaxOpenFiles(store.maxFdPerShard) - } - defer openOpts.Close() - newlyCreated := false - ldb, err := levigo.Open(path, openOpts) - if err == nil { - store.lg.Infof("LevelDB opened %s\n", path) - } else { - store.lg.Debugf("LevelDB failed to open %s: %s\n", path, err.Error()) - openOpts.SetCreateIfMissing(true) - ldb, err = levigo.Open(path, openOpts) - if err != nil { - store.lg.Errorf("LevelDB failed to create %s: %s\n", path, err.Error()) - return nil, err - } - store.lg.Infof("Created new LevelDB instance in %s\n", path) - newlyCreated = true - } - defer func() { - if shd == nil { - ldb.Close() - } - }() - lv, err := readLayoutVersion(store, ldb) - if err != nil { - store.lg.Errorf("Got error while reading datastore version for %s: %s\n", - path, err.Error()) - return nil, err - } - if newlyCreated && (lv == UNKNOWN_LAYOUT_VERSION) { - err = writeDataStoreVersion(store, ldb, CURRENT_LAYOUT_VERSION) - if err != nil { - store.lg.Errorf("Got error while writing datastore version for %s: %s\n", - path, err.Error()) - return nil, err - } - store.lg.Tracef("Wrote layout version %d to shard at %s.\n", - CURRENT_LAYOUT_VERSION, path) - } else if lv != CURRENT_LAYOUT_VERSION { - versionName := "unknown" - if lv != UNKNOWN_LAYOUT_VERSION { - versionName = fmt.Sprintf("%d", lv) - } - store.lg.Errorf("Can't read old datastore. Its layout version is %s, but this "+ - "software is at layout version %d. Please set %s to clear the datastore "+ - "on startup, or clear it manually.\n", versionName, - CURRENT_LAYOUT_VERSION, conf.HTRACE_DATA_STORE_CLEAR) - return nil, errors.New(fmt.Sprintf("Invalid layout version: got %s, expected %d.", - versionName, CURRENT_LAYOUT_VERSION)) - } else { - store.lg.Tracef("Found layout version %d in %s.\n", lv, path) - } - spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE) - shd = &shard{store: store, ldb: ldb, path: path, - incoming: make(chan []*IncomingSpan, spanBufferSize)} - return shd, nil -} - -// Read the datastore version of a leveldb instance. -func readLayoutVersion(store *dataStore, ldb *levigo.DB) (uint32, error) { - buf, err := ldb.Get(store.readOpts, []byte{VERSION_KEY}) - if err != nil { - return 0, err - } - if len(buf) == 0 { - return 0, nil - } - r := bytes.NewBuffer(buf) - decoder := gob.NewDecoder(r) - var v uint32 - err = decoder.Decode(&v) - if err != nil { - return 0, err - } - return v, nil -} - -// Write the datastore version to a shard. -func writeDataStoreVersion(store *dataStore, ldb *levigo.DB, v uint32) error { - w := new(bytes.Buffer) - encoder := gob.NewEncoder(w) - err := encoder.Encode(&v) - if err != nil { - return err - } - return ldb.Put(store.writeOpts, []byte{VERSION_KEY}, w.Bytes()) -} - // Close the DataStore. func (store *dataStore) Close() { if store.hb != nil { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/b7f9058c/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go index ebf3c47..a693874 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go @@ -29,7 +29,6 @@ import ( "org/apache/htrace/test" "os" "sort" - "strings" "testing" "time" ) @@ -446,6 +445,55 @@ func BenchmarkDatastoreWrites(b *testing.B) { assertNumWrittenEquals(b, ht.Store.msink, b.N) } +func verifySuccessfulLoad(t *testing.T, allSpans common.SpanSlice, + dataDirs []string) { + htraceBld := &MiniHTracedBuilder{ + Name: "TestReloadDataStore#verifySuccessfulLoad", + DataDirs: dataDirs, + KeepDataDirsOnClose: true, + } + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf(), nil) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + defer hcl.Close() + for i := 0; i < len(allSpans); i++ { + span, err := hcl.FindSpan(allSpans[i].Id) + if err != nil { + t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) + } + common.ExpectSpansEqual(t, allSpans[i], span) + } + // Look up the spans we wrote. + var span *common.Span + for i := 0; i < len(allSpans); i++ { + span, err = hcl.FindSpan(allSpans[i].Id) + if err != nil { + t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) + } + common.ExpectSpansEqual(t, allSpans[i], span) + } +} + +func verifyFailedLoad(t *testing.T, dataDirs []string, expectedErr string) { + htraceBld := &MiniHTracedBuilder{ + Name: "TestReloadDataStore#verifyFailedLoad", + DataDirs: dataDirs, + KeepDataDirsOnClose: true, + } + _, err := htraceBld.Build() + if err == nil { + t.Fatalf("expected failure to load, but the load succeeded.") + } + common.AssertErrContains(t, err, expectedErr) +} + func TestReloadDataStore(t *testing.T) { htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore", Cnf: map[string]string{ @@ -474,6 +522,7 @@ func TestReloadDataStore(t *testing.T) { if err != nil { t.Fatalf("failed to create client: %s", err.Error()) } + hcnf := ht.Cnf.Clone() // Create some random trace spans. NUM_TEST_SPANS := 5 @@ -493,57 +542,112 @@ func TestReloadDataStore(t *testing.T) { } common.ExpectSpansEqual(t, allSpans[i], span) } - + hcl.Close() ht.Close() ht = nil - htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore2", - DataDirs: dataDirs, KeepDataDirsOnClose: true} - ht, err = htraceBld.Build() + // Verify that we can reload the datastore, even if we configure the data + // directories in a different order. + verifySuccessfulLoad(t, allSpans, []string{dataDirs[1], dataDirs[0]}) + + // If we try to reload the datastore with only one directory, it won't work + // (we need both). + verifyFailedLoad(t, []string{dataDirs[1]}, + "The TotalShards field of all shards is 2, but we have 1 shards.") + + // Test that we give an intelligent error message when 0 directories are + // configured. + verifyFailedLoad(t, []string{}, "No shard directories found.") + + // Can't specify the same directory more than once... will get "lock + // already held by process" + verifyFailedLoad(t, []string{dataDirs[0], dataDirs[1], dataDirs[1]}, + " already held by process.") + + // Open the datastore and modify it to have the wrong DaemonId + dld := NewDataStoreLoader(hcnf) + defer func() { + if dld != nil { + dld.Close() + dld = nil + } + }() + dld.LoadShards() + sinfo, err := dld.shards[0].readShardInfo() if err != nil { - t.Fatalf("failed to re-create datastore: %s", err.Error()) - } - hcl, err = htrace.NewClient(ht.ClientConf(), nil) + t.Fatalf("error reading shard info for shard %s: %s\n", + dld.shards[0].path, err.Error()) + } + newDaemonId := sinfo.DaemonId + 1 + dld.lg.Infof("Read %s from shard %s. Changing daemonId to 0x%016x\n.", + asJson(sinfo), dld.shards[0].path, newDaemonId) + sinfo.DaemonId = newDaemonId + err = dld.shards[0].writeShardInfo(sinfo) if err != nil { - t.Fatalf("failed to re-create client: %s", err.Error()) + t.Fatalf("error writing shard info for shard %s: %s\n", + dld.shards[0].path, err.Error()) } + dld.Close() + dld = nil + verifyFailedLoad(t, dataDirs, "DaemonId mismatch.") - // Look up the spans we wrote earlier. - for i := 0; i < NUM_TEST_SPANS; i++ { - span, err = hcl.FindSpan(allSpans[i].Id) + // Open the datastore and modify it to have the wrong TotalShards + dld = NewDataStoreLoader(hcnf) + dld.LoadShards() + sinfo, err = dld.shards[0].readShardInfo() + if err != nil { + t.Fatalf("error reading shard info for shard %s: %s\n", + dld.shards[0].path, err.Error()) + } + newDaemonId = sinfo.DaemonId - 1 + dld.lg.Infof("Read %s from shard %s. Changing daemonId to 0x%016x, " + + "TotalShards to 3\n.", + asJson(sinfo), dld.shards[0].path, newDaemonId) + sinfo.DaemonId = newDaemonId + sinfo.TotalShards = 3 + err = dld.shards[0].writeShardInfo(sinfo) + if err != nil { + t.Fatalf("error writing shard info for shard %s: %s\n", + dld.shards[0].path, err.Error()) + } + dld.Close() + dld = nil + verifyFailedLoad(t, dataDirs, "TotalShards mismatch.") + + // Open the datastore and modify it to have the wrong LayoutVersion + dld = NewDataStoreLoader(hcnf) + dld.LoadShards() + for shardIdx := range(dld.shards) { + sinfo, err = dld.shards[shardIdx].readShardInfo() if err != nil { - t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error()) + t.Fatalf("error reading shard info for shard %s: %s\n", + dld.shards[shardIdx].path, err.Error()) + } + dld.lg.Infof("Read %s from shard %s. Changing TotalShards to 2, " + + "LayoutVersion to 2\n", asJson(sinfo), dld.shards[shardIdx].path) + sinfo.TotalShards = 2 + sinfo.LayoutVersion = 2 + err = dld.shards[shardIdx].writeShardInfo(sinfo) + if err != nil { + t.Fatalf("error writing shard info for shard %s: %s\n", + dld.shards[0].path, err.Error()) } - common.ExpectSpansEqual(t, allSpans[i], span) - } - - // Set an old datastore version number. - for i := range ht.Store.shards { - shard := ht.Store.shards[i] - writeDataStoreVersion(ht.Store, shard.ldb, CURRENT_LAYOUT_VERSION-1) - } - ht.Close() - ht = nil - - htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore3", - DataDirs: dataDirs, KeepDataDirsOnClose: true} - ht, err = htraceBld.Build() - if err == nil { - t.Fatalf("expected the datastore to fail to load after setting an " + - "incorrect version.\n") - } - if !strings.Contains(err.Error(), "Invalid layout version") { - t.Fatal(`expected the loading error to contain "invalid layout version"` + "\n") } + dld.Close() + dld = nil + verifyFailedLoad(t, dataDirs, "The layout version of all shards is 2, " + + "but we only support") // It should work with data.store.clear set. - htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore4", - DataDirs: dataDirs, KeepDataDirsOnClose: true, - Cnf: map[string]string{conf.HTRACE_DATA_STORE_CLEAR: "true"}} + htraceBld = &MiniHTracedBuilder{ + Name: "TestReloadDataStore#clear", + DataDirs: dataDirs, + KeepDataDirsOnClose: true, + Cnf: map[string]string{conf.HTRACE_DATA_STORE_CLEAR: "true"}, + } ht, err = htraceBld.Build() if err != nil { - t.Fatalf("expected the datastore loading to succeed after setting an "+ - "incorrect version. But it failed with error %s\n", err.Error()) + t.Fatalf("failed to create datastore: %s", err.Error()) } } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/b7f9058c/htrace-htraced/go/src/org/apache/htrace/htraced/loader.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/loader.go b/htrace-htraced/go/src/org/apache/htrace/htraced/loader.go new file mode 100644 index 0000000..cb5ada7 --- /dev/null +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/loader.go @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bytes" + "errors" + "fmt" + "github.com/jmhodges/levigo" + "github.com/ugorji/go/codec" + "io" + "math" + "math/rand" + "org/apache/htrace/common" + "org/apache/htrace/conf" + "os" + "strings" + "syscall" + "time" +) + +// Routines for loading the datastore. + +// The leveldb key which has information about the shard. +const SHARD_INFO_KEY = 'w' + +// A constant signifying that we don't know what the layout version is. +const UNKNOWN_LAYOUT_VERSION = 0 + +// The current layout version. We cannot read layout versions newer than this. +// We may sometimes be able to read older versions, but only by doing an +// upgrade. +const CURRENT_LAYOUT_VERSION = 3 + +type DataStoreLoader struct { + // The dataStore logger. + lg *common.Logger + + // True if we should clear the stored data. + ClearStored bool + + // The shards that we're loading + shards []*ShardLoader + + // The options to use for opening datastores in LevelDB. + openOpts *levigo.Options + + // The read options to use for LevelDB. + readOpts *levigo.ReadOptions + + // The write options to use for LevelDB. + writeOpts *levigo.WriteOptions +} + +// Information about a Shard. +type ShardInfo struct { + // The layout version of the datastore. + // We should always keep this field so that old software can recognize new + // layout versions, even if it can't read them. + LayoutVersion uint64 + + // A random number identifying this daemon. + DaemonId uint64 + + // The total number of shards in this datastore. + TotalShards uint32 + + // The index of this shard within the datastore. + ShardIndex uint32 +} + +// Create a new datastore loader. +// Initializes the loader, but does not load any leveldb instances. +func NewDataStoreLoader(cnf *conf.Config) *DataStoreLoader { + dld := &DataStoreLoader{ + lg: common.NewLogger("datastore", cnf), + ClearStored: cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR), + } + dld.readOpts = levigo.NewReadOptions() + dld.readOpts.SetFillCache(true) + dld.readOpts.SetVerifyChecksums(false) + dld.writeOpts = levigo.NewWriteOptions() + dld.writeOpts.SetSync(false) + dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES) + rdirs := strings.Split(dirsStr, conf.PATH_LIST_SEP) + // Filter out empty entries + dirs := make([]string, 0, len(rdirs)) + for i := range(rdirs) { + if strings.TrimSpace(rdirs[i]) != "" { + dirs = append(dirs, rdirs[i]) + } + } + dld.shards = make([]*ShardLoader, len(dirs)) + for i := range(dirs) { + dld.shards[i] = &ShardLoader{ + dld: dld, + path: dirs[i] + conf.PATH_SEP + "db", + } + } + dld.openOpts = levigo.NewOptions() + dld.openOpts.SetParanoidChecks(false) + writeBufferSize := cnf.GetInt(conf.HTRACE_LEVELDB_WRITE_BUFFER_SIZE) + if writeBufferSize > 0 { + dld.openOpts.SetWriteBufferSize(writeBufferSize) + } + maxFdPerShard := dld.calculateMaxOpenFilesPerShard() + if maxFdPerShard > 0 { + dld.openOpts.SetMaxOpenFiles(maxFdPerShard) + } + return dld +} + +func (dld *DataStoreLoader) Close() { + if dld.lg != nil { + dld.lg.Close() + dld.lg = nil + } + if dld.openOpts != nil { + dld.openOpts.Close() + dld.openOpts = nil + } + if dld.readOpts != nil { + dld.readOpts.Close() + dld.readOpts = nil + } + if dld.writeOpts != nil { + dld.writeOpts.Close() + dld.writeOpts = nil + } + if dld.shards != nil { + for i := range(dld.shards) { + if dld.shards[i] != nil { + dld.shards[i].Close() + } + } + dld.shards = nil + } +} + +func (dld *DataStoreLoader) DisownResources() { + dld.lg = nil + dld.openOpts = nil + dld.readOpts = nil + dld.writeOpts = nil + dld.shards = nil +} + +// The maximum number of file descriptors we'll use on non-datastore things. +const NON_DATASTORE_FD_MAX = 300 + +// The minimum number of file descriptors per shard we will set. Setting fewer +// than this number could trigger a bug in some early versions of leveldb. +const MIN_FDS_PER_SHARD = 80 + +func (dld *DataStoreLoader) calculateMaxOpenFilesPerShard() int { + var rlim syscall.Rlimit + err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlim) + if err != nil { + dld.lg.Warnf("Unable to calculate maximum open files per shard: " + + "getrlimit failed: %s\n", err.Error()) + return 0 + } + // I think RLIMIT_NOFILE fits in 32 bits on all known operating systems, + // but there's no harm in being careful. 'int' in golang always holds at + // least 32 bits. + var maxFd int + if rlim.Cur > uint64(math.MaxInt32) { + maxFd = math.MaxInt32 + } else { + maxFd = int(rlim.Cur) + } + if len(dld.shards) == 0 { + dld.lg.Warnf("Unable to calculate maximum open files per shard, " + + "since there are 0 shards configured.\n") + return 0 + } + fdsPerShard := (maxFd - NON_DATASTORE_FD_MAX) / len(dld.shards) + if fdsPerShard < MIN_FDS_PER_SHARD { + dld.lg.Warnf("Expected to be able to use at least %d " + + "fds per shard, but we have %d shards and %d total fds to allocate, " + + "giving us only %d FDs per shard.", MIN_FDS_PER_SHARD, + len(dld.shards), maxFd - NON_DATASTORE_FD_MAX, fdsPerShard) + return 0 + } + dld.lg.Infof("maxFd = %d. Setting maxFdPerShard = %d\n", + maxFd, fdsPerShard) + return fdsPerShard +} + +// Load information about all shards. +func (dld *DataStoreLoader) LoadShards() { + for i := range(dld.shards) { + shd := dld.shards[i] + shd.load() + } +} + +// Verify that the shard infos are consistent. +// Reorders the shardInfo structures based on their ShardIndex. +func (dld *DataStoreLoader) VerifyShardInfos() error { + if len(dld.shards) < 1 { + return errors.New("No shard directories found.") + } + // Make sure no shards had errors. + for i := range(dld.shards) { + shd := dld.shards[i] + if shd.infoErr != nil { + return shd.infoErr + } + } + // Make sure that if any shards are empty, all shards are empty. + emptyShards := "" + prefix := "" + for i := range(dld.shards) { + if dld.shards[i].info == nil { + emptyShards = prefix + dld.shards[i].path + prefix = ", " + } + } + if emptyShards != "" { + for i := range(dld.shards) { + if dld.shards[i].info != nil { + return errors.New(fmt.Sprintf("Shards %s were empty, but " + + "the other shards had data.", emptyShards)) + } + } + // All shards are empty. + return nil + } + // Make sure that all shards have the same layout version, daemonId, and number of total + // shards. + layoutVersion := dld.shards[0].info.LayoutVersion + daemonId := dld.shards[0].info.DaemonId + totalShards := dld.shards[0].info.TotalShards + for i := 1; i < len(dld.shards); i++ { + shd := dld.shards[i] + if layoutVersion != shd.info.LayoutVersion { + return errors.New(fmt.Sprintf("Layout version mismatch. Shard " + + "%s has layout version 0x%016x, but shard %s has layout " + + "version 0x%016x.", + dld.shards[0].path, layoutVersion, shd.path, shd.info.LayoutVersion)) + } + if daemonId != shd.info.DaemonId { + return errors.New(fmt.Sprintf("DaemonId mismatch. Shard %s has " + + "daemonId 0x%016x, but shard %s has daemonId 0x%016x.", + dld.shards[0].path, daemonId, shd.path, shd.info.DaemonId)) + } + if totalShards != shd.info.TotalShards { + return errors.New(fmt.Sprintf("TotalShards mismatch. Shard %s has " + + "TotalShards = %d, but shard %s has TotalShards = %d.", + dld.shards[0].path, totalShards, shd.path, shd.info.TotalShards)) + } + if shd.info.ShardIndex >= totalShards { + return errors.New(fmt.Sprintf("Invalid ShardIndex. Shard %s has " + + "ShardIndex = %d, but TotalShards = %d.", + shd.path, shd.info.ShardIndex, shd.info.TotalShards)) + } + } + if layoutVersion != CURRENT_LAYOUT_VERSION { + return errors.New(fmt.Sprintf("The layout version of all shards " + + "is %d, but we only support version %d.", + layoutVersion, CURRENT_LAYOUT_VERSION)) + } + if totalShards != uint32(len(dld.shards)) { + return errors.New(fmt.Sprintf("The TotalShards field of all shards " + + "is %d, but we have %d shards.", totalShards, len(dld.shards))) + } + // Reorder shards in order of their ShardIndex. + reorderedShards := make([]*ShardLoader, len(dld.shards)) + for i := 0; i < len(dld.shards); i++ { + shd := dld.shards[i] + shardIdx := shd.info.ShardIndex + if reorderedShards[shardIdx] != nil { + return errors.New(fmt.Sprintf("Both shard %s and " + + "shard %s have ShardIndex %d.", shd.path, + reorderedShards[shardIdx].path, shardIdx)) + } + reorderedShards[shardIdx] = shd + } + dld.shards = reorderedShards + return nil +} + +func (dld *DataStoreLoader) Load() error { + var err error + // If data.store.clear was set, clear existing data. + if dld.ClearStored { + err = dld.clearStored() + if err != nil { + return err + } + } + // Make sure the shard directories exist in all cases, with a mkdir -p + for i := range dld.shards { + err := os.MkdirAll(dld.shards[i].path, 0777) + if err != nil { + return errors.New(fmt.Sprintf("Failed to MkdirAll(%s): %s", + dld.shards[i].path, err.Error())) + } + } + // Get information about each shard, and verify them. + dld.LoadShards() + err = dld.VerifyShardInfos() + if err != nil { + return err + } + if dld.shards[0].ldb != nil { + dld.lg.Infof("Loaded %d leveldb instances with " + + "DaemonId of 0x%016x\n", len(dld.shards), + dld.shards[0].info.DaemonId) + } else { + // Create leveldb instances if needed. + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + daemonId := uint64(rnd.Int63()) + dld.lg.Infof("Initializing %d leveldb instances with a new " + + "DaemonId of 0x%016x\n", len(dld.shards), daemonId) + dld.openOpts.SetCreateIfMissing(true) + for i := range(dld.shards) { + shd := dld.shards[i] + shd.ldb, err = levigo.Open(shd.path, shd.dld.openOpts) + if err != nil { + return errors.New(fmt.Sprintf("levigo.Open(%s) failed to " + + "create the shard: %s", shd.path, err.Error())) + } + info := &ShardInfo { + LayoutVersion: CURRENT_LAYOUT_VERSION, + DaemonId: daemonId, + TotalShards: uint32(len(dld.shards)), + ShardIndex: uint32(i), + } + err = shd.writeShardInfo(info) + if err != nil { + return errors.New(fmt.Sprintf("levigo.Open(%s) failed to " + + "write shard info: %s", shd.path, err.Error())) + } + dld.lg.Infof("Shard %s initialized with ShardInfo %s \n", + shd.path, asJson(info)) + } + } + return nil +} + +func (dld *DataStoreLoader) clearStored() error { + for i := range dld.shards { + path := dld.shards[i].path + fi, err := os.Stat(path) + if err != nil && !os.IsNotExist(err) { + dld.lg.Errorf("Failed to stat %s: %s\n", path, err.Error()) + return err + } + if fi != nil { + err = os.RemoveAll(path) + if err != nil { + dld.lg.Errorf("Failed to clear existing datastore directory %s: %s\n", + path, err.Error()) + return err + } + dld.lg.Infof("Cleared existing datastore directory %s\n", path) + } + } + return nil +} + +type ShardLoader struct { + // The parent DataStoreLoader + dld *DataStoreLoader + + // Path to the shard + path string + + // Leveldb instance of the shard + ldb *levigo.DB + + // Information about the shard + info *ShardInfo + + // If non-null, the error we encountered trying to load the shard info. + infoErr error +} + +func (shd *ShardLoader) Close() { + if shd.ldb != nil { + shd.ldb.Close() + shd.ldb = nil + } +} + +// Load information about a particular shard. +func (shd *ShardLoader) load() { + shd.info = nil + fi, err := os.Stat(shd.path) + if err != nil { + if os.IsNotExist(err) { + shd.infoErr = nil + return + } + shd.infoErr = errors.New(fmt.Sprintf( + "stat() error on leveldb directory " + + "%s: %s", shd.path, err.Error())) + return + } + if !fi.Mode().IsDir() { + shd.infoErr = errors.New(fmt.Sprintf( + "stat() error on leveldb directory " + + "%s: inode is not directory.", shd.path)) + return + } + var dbDir *os.File + dbDir, err = os.Open(shd.path) + if err != nil { + shd.infoErr = errors.New(fmt.Sprintf( + "open() error on leveldb directory " + + "%s: %s.", shd.path, err.Error())) + return + } + defer func() { + if dbDir != nil { + dbDir.Close() + } + }() + _, err = dbDir.Readdirnames(1) + if err != nil { + if err == io.EOF { + // The db directory is empty. + shd.infoErr = nil + return + } + shd.infoErr = errors.New(fmt.Sprintf( + "Readdirnames() error on leveldb directory " + + "%s: %s.", shd.path, err.Error())) + return + } + dbDir.Close() + dbDir = nil + shd.ldb, err = levigo.Open(shd.path, shd.dld.openOpts) + if err != nil { + shd.ldb = nil + shd.infoErr = errors.New(fmt.Sprintf( + "levigo.Open() error on leveldb directory " + + "%s: %s.", shd.path, err.Error())) + return + } + shd.info, err = shd.readShardInfo() + if err != nil { + shd.infoErr = err + return + } + shd.infoErr = nil +} + +func (shd *ShardLoader) readShardInfo() (*ShardInfo, error) { + buf, err := shd.ldb.Get(shd.dld.readOpts, []byte{SHARD_INFO_KEY}) + if err != nil { + return nil, errors.New(fmt.Sprintf("readShardInfo(%s): failed to " + + "read shard info key: %s", shd.path, err.Error())) + } + if len(buf) == 0 { + return nil, errors.New(fmt.Sprintf("readShardInfo(%s): got zero-" + + "length value for shard info key.", shd.path)) + } + mh := new(codec.MsgpackHandle) + mh.WriteExt = true + r := bytes.NewBuffer(buf) + decoder := codec.NewDecoder(r, mh) + shardInfo := &ShardInfo { + LayoutVersion: UNKNOWN_LAYOUT_VERSION, + } + err = decoder.Decode(shardInfo) + if err != nil { + return nil, errors.New(fmt.Sprintf("readShardInfo(%s): msgpack " + + "decoding failed for shard info key: %s", shd.path, err.Error())) + } + return shardInfo, nil +} + +func (shd *ShardLoader) writeShardInfo(info *ShardInfo) error { + mh := new(codec.MsgpackHandle) + mh.WriteExt = true + w := new(bytes.Buffer) + enc := codec.NewEncoder(w, mh) + err := enc.Encode(info) + if err != nil { + return errors.New(fmt.Sprintf("msgpack encoding error: %s", + err.Error())) + } + err = shd.ldb.Put(shd.dld.writeOpts, []byte{SHARD_INFO_KEY}, w.Bytes()) + if err != nil { + return errors.New(fmt.Sprintf("leveldb write error: %s", + err.Error())) + } + return nil +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/b7f9058c/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go index 667a17b..cf7ef67 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go @@ -115,8 +115,10 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) { store.Close() } for idx := range bld.DataDirs { - if bld.DataDirs[idx] != "" { - os.RemoveAll(bld.DataDirs[idx]) + if !bld.KeepDataDirsOnClose { + if bld.DataDirs[idx] != "" { + os.RemoveAll(bld.DataDirs[idx]) + } } } if rsv != nil {
