Repository: incubator-htrace
Updated Branches:
  refs/heads/master c101f40ed -> b7f9058ca


HTRACE-314. htraced: make datastore loading safer (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/b7f9058c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/b7f9058c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/b7f9058c

Branch: refs/heads/master
Commit: b7f9058ca9d72974a3133ac2ea6f9c948ce3d539
Parents: c101f40
Author: Colin P. Mccabe <[email protected]>
Authored: Tue Dec 1 21:47:58 2015 -0800
Committer: Colin P. Mccabe <[email protected]>
Committed: Fri Dec 4 11:59:28 2015 -0800

----------------------------------------------------------------------
 .../src/org/apache/htrace/common/test_util.go   |   8 +
 .../src/org/apache/htrace/htraced/datastore.go  | 245 ++-------
 .../org/apache/htrace/htraced/datastore_test.go | 180 +++++--
 .../go/src/org/apache/htrace/htraced/loader.go  | 509 +++++++++++++++++++
 .../org/apache/htrace/htraced/mini_htraced.go   |   6 +-
 5 files changed, 692 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/b7f9058c/htrace-htraced/go/src/org/apache/htrace/common/test_util.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/test_util.go 
b/htrace-htraced/go/src/org/apache/htrace/common/test_util.go
index ec9151b..a761525 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/test_util.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/test_util.go
@@ -22,6 +22,7 @@ package common
 import (
        "fmt"
        "testing"
+       "strings"
        "time"
 )
 
@@ -81,3 +82,10 @@ func TestId(str string) SpanId {
        }
        return spanId
 }
+
+func AssertErrContains(t *testing.T, err error, str string) {
+       if !strings.Contains(err.Error(), str) {
+               t.Fatalf("expected the error to contain %s, but it was %s\n",
+                       str, err.Error())
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/b7f9058c/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index 828a6af..596b652 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -21,21 +21,17 @@ package main
 
 import (
        "bytes"
-       "encoding/gob"
        "encoding/hex"
        "errors"
        "fmt"
        "github.com/jmhodges/levigo"
        "github.com/ugorji/go/codec"
-       "math"
        "org/apache/htrace/common"
        "org/apache/htrace/conf"
-       "os"
        "strconv"
        "strings"
        "sync"
        "sync/atomic"
-       "syscall"
        "time"
 )
 
@@ -68,13 +64,8 @@ import (
 // the signed fields.
 //
 
-const UNKNOWN_LAYOUT_VERSION = 0
-const CURRENT_LAYOUT_VERSION = 3
-
 var EMPTY_BYTE_BUF []byte = []byte{}
 
-const VERSION_KEY = 'v'
-
 const SPAN_ID_INDEX_PREFIX = 's'
 const BEGIN_TIME_INDEX_PREFIX = 'b'
 const END_TIME_INDEX_PREFIX = 'e'
@@ -462,227 +453,49 @@ type dataStore struct {
 
        // When this datastore was started (in UTC milliseconds since the epoch)
        startMs int64
-
-       // The maximum number of open files to allow per shard.
-       maxFdPerShard int
 }
 
 func CreateDataStore(cnf *conf.Config, writtenSpans *common.Semaphore) 
(*dataStore, error) {
-       // Get the configuration.
-       clearStored := cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR)
-       dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES)
-       dirs := strings.Split(dirsStr, conf.PATH_LIST_SEP)
-
-       var err error
-       lg := common.NewLogger("datastore", cnf)
-       store := &dataStore{lg: lg, shards: []*shard{}, WrittenSpans: 
writtenSpans}
-
-       // If we return an error, close the store.
-       defer func() {
-               if err != nil {
-                       store.Close()
-                       store = nil
-               }
-       }()
-
-       store.readOpts = levigo.NewReadOptions()
-       store.readOpts.SetFillCache(true)
-       store.readOpts.SetVerifyChecksums(false)
-       store.writeOpts = levigo.NewWriteOptions()
-       store.writeOpts.SetSync(false)
-
-       // Open all shards
-       for idx := range dirs {
-               path := dirs[idx] + conf.PATH_SEP + "db"
-               var shd *shard
-               shd, err = CreateShard(store, cnf, path, clearStored)
-               if err != nil {
-                       lg.Errorf("Error creating shard %s: %s\n", path, 
err.Error())
-                       return nil, err
-               }
-               store.shards = append(store.shards, shd)
+       dld := NewDataStoreLoader(cnf)
+       defer dld.Close()
+       err := dld.Load()
+       if err != nil {
+               dld.lg.Errorf("Error loading datastore: %s\n", err.Error())
+               return nil, err
        }
-       store.msink = NewMetricsSink(cnf)
-       store.rpr = NewReaper(cnf)
-       for idx := range store.shards {
-               shd := store.shards[idx]
-               shd.heartbeats = make(chan interface{}, 1)
-               shd.exited.Add(1)
-               go shd.processIncoming()
+       store := &dataStore {
+               lg: dld.lg,
+               shards: make([]*shard, len(dld.shards)),
+               readOpts: dld.readOpts,
+               writeOpts: dld.writeOpts,
+               WrittenSpans: writtenSpans,
+               msink: NewMetricsSink(cnf),
+               hb: NewHeartbeater("DatastoreHeartbeater",
+                       
cnf.GetInt64(conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS), dld.lg),
+               rpr: NewReaper(cnf),
+               startMs: common.TimeToUnixMs(time.Now().UTC()),
        }
-       store.hb = NewHeartbeater("DatastoreHeartbeater",
-               cnf.GetInt64(conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS), lg)
+       spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE)
        for shdIdx := range store.shards {
-               shd := store.shards[shdIdx]
+               shd := &shard {
+                       store: store,
+                       ldb: dld.shards[shdIdx].ldb,
+                       path: dld.shards[shdIdx].path,
+                       incoming: make(chan []*IncomingSpan, spanBufferSize),
+                       heartbeats: make(chan interface{}, 1),
+               }
+               shd.exited.Add(1)
+               go shd.processIncoming()
+               store.shards[shdIdx] = shd
                store.hb.AddHeartbeatTarget(&HeartbeatTarget{
                        name:       fmt.Sprintf("shard(%s)", shd.path),
                        targetChan: shd.heartbeats,
                })
        }
-       store.startMs = common.TimeToUnixMs(time.Now().UTC())
-       err = store.calculateMaxOpenFilesPerShard()
-       if err != nil {
-               lg.Warnf("Unable to calculate maximum open files per shard: 
%s\n",
-                       err.Error())
-       }
+       dld.DisownResources()
        return store, nil
 }
 
-// The maximum number of file descriptors we'll use on non-datastore things.
-const NON_DATASTORE_FD_MAX = 300
-
-// The minimum number of file descriptors per shard we will set.  Setting fewer
-// than this number could trigger a bug in some early versions of leveldb.
-const MIN_FDS_PER_SHARD = 80
-
-func (store *dataStore) calculateMaxOpenFilesPerShard() error {
-       var rlim syscall.Rlimit
-       err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlim)
-       if err != nil {
-               return err
-       }
-       // I think RLIMIT_NOFILE fits in 32 bits on all known operating systems,
-       // but there's no harm in being careful.  'int' in golang always holds 
at
-       // least 32 bits.
-       var maxFd int
-       if rlim.Cur > uint64(math.MaxInt32) {
-               maxFd = math.MaxInt32
-       } else {
-               maxFd = int(rlim.Cur)
-       }
-       fdsPerShard := (maxFd - NON_DATASTORE_FD_MAX) / len(store.shards)
-       if fdsPerShard < MIN_FDS_PER_SHARD {
-               return errors.New(fmt.Sprintf("Expected to be able to use at 
least %d " +
-                       "fds per shard, but we have %d shards and %d total fds 
to allocate, " +
-                       "giving us only %d FDs per shard.", MIN_FDS_PER_SHARD,
-                       len(store.shards), maxFd - NON_DATASTORE_FD_MAX, 
fdsPerShard))
-       }
-       store.lg.Infof("maxFd = %d.  Setting maxFdPerShard = %d\n",
-               maxFd, fdsPerShard)
-       store.maxFdPerShard = fdsPerShard
-       return nil
-}
-
-func CreateShard(store *dataStore, cnf *conf.Config, path string,
-       clearStored bool) (*shard, error) {
-       lg := store.lg
-       if clearStored {
-               fi, err := os.Stat(path)
-               if err != nil && !os.IsNotExist(err) {
-                       lg.Errorf("Failed to stat %s: %s\n", path, err.Error())
-                       return nil, err
-               }
-               if fi != nil {
-                       err = os.RemoveAll(path)
-                       if err != nil {
-                               lg.Errorf("Failed to clear existing datastore 
directory %s: %s\n",
-                                       path, err.Error())
-                               return nil, err
-                       }
-                       lg.Infof("Cleared existing datastore directory %s\n", 
path)
-               }
-       }
-       err := os.MkdirAll(path, 0777)
-       if err != nil {
-               lg.Errorf("Failed to MkdirAll(%s): %s\n", path, err.Error())
-               return nil, err
-       }
-       var shd *shard
-       openOpts := levigo.NewOptions()
-       openOpts.SetParanoidChecks(false)
-       writeBufferSize := cnf.GetInt(conf.HTRACE_LEVELDB_WRITE_BUFFER_SIZE)
-       if writeBufferSize > 0 {
-               openOpts.SetWriteBufferSize(writeBufferSize)
-       }
-       if store.maxFdPerShard > 0 {
-               openOpts.SetMaxOpenFiles(store.maxFdPerShard)
-       }
-       defer openOpts.Close()
-       newlyCreated := false
-       ldb, err := levigo.Open(path, openOpts)
-       if err == nil {
-               store.lg.Infof("LevelDB opened %s\n", path)
-       } else {
-               store.lg.Debugf("LevelDB failed to open %s: %s\n", path, 
err.Error())
-               openOpts.SetCreateIfMissing(true)
-               ldb, err = levigo.Open(path, openOpts)
-               if err != nil {
-                       store.lg.Errorf("LevelDB failed to create %s: %s\n", 
path, err.Error())
-                       return nil, err
-               }
-               store.lg.Infof("Created new LevelDB instance in %s\n", path)
-               newlyCreated = true
-       }
-       defer func() {
-               if shd == nil {
-                       ldb.Close()
-               }
-       }()
-       lv, err := readLayoutVersion(store, ldb)
-       if err != nil {
-               store.lg.Errorf("Got error while reading datastore version for 
%s: %s\n",
-                       path, err.Error())
-               return nil, err
-       }
-       if newlyCreated && (lv == UNKNOWN_LAYOUT_VERSION) {
-               err = writeDataStoreVersion(store, ldb, CURRENT_LAYOUT_VERSION)
-               if err != nil {
-                       store.lg.Errorf("Got error while writing datastore 
version for %s: %s\n",
-                               path, err.Error())
-                       return nil, err
-               }
-               store.lg.Tracef("Wrote layout version %d to shard at %s.\n",
-                       CURRENT_LAYOUT_VERSION, path)
-       } else if lv != CURRENT_LAYOUT_VERSION {
-               versionName := "unknown"
-               if lv != UNKNOWN_LAYOUT_VERSION {
-                       versionName = fmt.Sprintf("%d", lv)
-               }
-               store.lg.Errorf("Can't read old datastore.  Its layout version 
is %s, but this "+
-                       "software is at layout version %d.  Please set %s to 
clear the datastore "+
-                       "on startup, or clear it manually.\n", versionName,
-                       CURRENT_LAYOUT_VERSION, conf.HTRACE_DATA_STORE_CLEAR)
-               return nil, errors.New(fmt.Sprintf("Invalid layout version: got 
%s, expected %d.",
-                       versionName, CURRENT_LAYOUT_VERSION))
-       } else {
-               store.lg.Tracef("Found layout version %d in %s.\n", lv, path)
-       }
-       spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE)
-       shd = &shard{store: store, ldb: ldb, path: path,
-               incoming: make(chan []*IncomingSpan, spanBufferSize)}
-       return shd, nil
-}
-
-// Read the datastore version of a leveldb instance.
-func readLayoutVersion(store *dataStore, ldb *levigo.DB) (uint32, error) {
-       buf, err := ldb.Get(store.readOpts, []byte{VERSION_KEY})
-       if err != nil {
-               return 0, err
-       }
-       if len(buf) == 0 {
-               return 0, nil
-       }
-       r := bytes.NewBuffer(buf)
-       decoder := gob.NewDecoder(r)
-       var v uint32
-       err = decoder.Decode(&v)
-       if err != nil {
-               return 0, err
-       }
-       return v, nil
-}
-
-// Write the datastore version to a shard.
-func writeDataStoreVersion(store *dataStore, ldb *levigo.DB, v uint32) error {
-       w := new(bytes.Buffer)
-       encoder := gob.NewEncoder(w)
-       err := encoder.Encode(&v)
-       if err != nil {
-               return err
-       }
-       return ldb.Put(store.writeOpts, []byte{VERSION_KEY}, w.Bytes())
-}
-
 // Close the DataStore.
 func (store *dataStore) Close() {
        if store.hb != nil {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/b7f9058c/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
index ebf3c47..a693874 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -29,7 +29,6 @@ import (
        "org/apache/htrace/test"
        "os"
        "sort"
-       "strings"
        "testing"
        "time"
 )
@@ -446,6 +445,55 @@ func BenchmarkDatastoreWrites(b *testing.B) {
        assertNumWrittenEquals(b, ht.Store.msink, b.N)
 }
 
+func verifySuccessfulLoad(t *testing.T, allSpans common.SpanSlice,
+               dataDirs []string) {
+       htraceBld := &MiniHTracedBuilder{
+               Name: "TestReloadDataStore#verifySuccessfulLoad",
+               DataDirs: dataDirs,
+               KeepDataDirsOnClose: true,
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       defer ht.Close()
+       var hcl *htrace.Client
+       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
+       if err != nil {
+               t.Fatalf("failed to create client: %s", err.Error())
+       }
+       defer hcl.Close()
+       for i := 0; i < len(allSpans); i++ {
+               span, err := hcl.FindSpan(allSpans[i].Id)
+               if err != nil {
+                       t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
+               }
+               common.ExpectSpansEqual(t, allSpans[i], span)
+       }
+       // Look up the spans we wrote.
+       var span *common.Span
+       for i := 0; i < len(allSpans); i++ {
+               span, err = hcl.FindSpan(allSpans[i].Id)
+               if err != nil {
+                       t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
+               }
+               common.ExpectSpansEqual(t, allSpans[i], span)
+       }
+}
+
+func verifyFailedLoad(t *testing.T, dataDirs []string, expectedErr string) {
+       htraceBld := &MiniHTracedBuilder{
+               Name: "TestReloadDataStore#verifyFailedLoad",
+               DataDirs: dataDirs,
+               KeepDataDirsOnClose: true,
+       }
+       _, err := htraceBld.Build()
+       if err == nil {
+               t.Fatalf("expected failure to load, but the load succeeded.")
+       }
+       common.AssertErrContains(t, err, expectedErr)
+}
+
 func TestReloadDataStore(t *testing.T) {
        htraceBld := &MiniHTracedBuilder{Name: "TestReloadDataStore",
                Cnf: map[string]string{
@@ -474,6 +522,7 @@ func TestReloadDataStore(t *testing.T) {
        if err != nil {
                t.Fatalf("failed to create client: %s", err.Error())
        }
+       hcnf := ht.Cnf.Clone()
 
        // Create some random trace spans.
        NUM_TEST_SPANS := 5
@@ -493,57 +542,112 @@ func TestReloadDataStore(t *testing.T) {
                }
                common.ExpectSpansEqual(t, allSpans[i], span)
        }
-
+       hcl.Close()
        ht.Close()
        ht = nil
 
-       htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore2",
-               DataDirs: dataDirs, KeepDataDirsOnClose: true}
-       ht, err = htraceBld.Build()
+       // Verify that we can reload the datastore, even if we configure the 
data
+       // directories in a different order.
+       verifySuccessfulLoad(t, allSpans, []string{dataDirs[1], dataDirs[0]})
+
+       // If we try to reload the datastore with only one directory, it won't 
work
+       // (we need both).
+       verifyFailedLoad(t, []string{dataDirs[1]},
+               "The TotalShards field of all shards is 2, but we have 1 
shards.")
+
+       // Test that we give an intelligent error message when 0 directories are
+       // configured.
+       verifyFailedLoad(t, []string{}, "No shard directories found.")
+
+       // Can't specify the same directory more than once... will get "lock
+       // already held by process"
+       verifyFailedLoad(t, []string{dataDirs[0], dataDirs[1], dataDirs[1]},
+               " already held by process.")
+
+       // Open the datastore and modify it to have the wrong DaemonId
+       dld := NewDataStoreLoader(hcnf)
+       defer func() {
+               if dld != nil {
+                       dld.Close()
+                       dld = nil
+               }
+       }()
+       dld.LoadShards()
+       sinfo, err := dld.shards[0].readShardInfo()
        if err != nil {
-               t.Fatalf("failed to re-create datastore: %s", err.Error())
-       }
-       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
+               t.Fatalf("error reading shard info for shard %s: %s\n",
+                       dld.shards[0].path, err.Error())
+       }
+       newDaemonId := sinfo.DaemonId + 1
+       dld.lg.Infof("Read %s from shard %s.  Changing daemonId to 0x%016x\n.",
+               asJson(sinfo), dld.shards[0].path, newDaemonId)
+       sinfo.DaemonId = newDaemonId
+       err = dld.shards[0].writeShardInfo(sinfo)
        if err != nil {
-               t.Fatalf("failed to re-create client: %s", err.Error())
+               t.Fatalf("error writing shard info for shard %s: %s\n",
+                       dld.shards[0].path, err.Error())
        }
+       dld.Close()
+       dld = nil
+       verifyFailedLoad(t, dataDirs, "DaemonId mismatch.")
 
-       // Look up the spans we wrote earlier.
-       for i := 0; i < NUM_TEST_SPANS; i++ {
-               span, err = hcl.FindSpan(allSpans[i].Id)
+       // Open the datastore and modify it to have the wrong TotalShards
+       dld = NewDataStoreLoader(hcnf)
+       dld.LoadShards()
+       sinfo, err = dld.shards[0].readShardInfo()
+       if err != nil {
+               t.Fatalf("error reading shard info for shard %s: %s\n",
+                       dld.shards[0].path, err.Error())
+       }
+       newDaemonId = sinfo.DaemonId - 1
+       dld.lg.Infof("Read %s from shard %s.  Changing daemonId to 0x%016x, " +
+               "TotalShards to 3\n.",
+               asJson(sinfo), dld.shards[0].path, newDaemonId)
+       sinfo.DaemonId = newDaemonId
+       sinfo.TotalShards = 3
+       err = dld.shards[0].writeShardInfo(sinfo)
+       if err != nil {
+               t.Fatalf("error writing shard info for shard %s: %s\n",
+                       dld.shards[0].path, err.Error())
+       }
+       dld.Close()
+       dld = nil
+       verifyFailedLoad(t, dataDirs, "TotalShards mismatch.")
+
+       // Open the datastore and modify it to have the wrong LayoutVersion
+       dld = NewDataStoreLoader(hcnf)
+       dld.LoadShards()
+       for shardIdx := range(dld.shards) {
+               sinfo, err = dld.shards[shardIdx].readShardInfo()
                if err != nil {
-                       t.Fatalf("FindSpan(%d) failed: %s\n", i, err.Error())
+                       t.Fatalf("error reading shard info for shard %s: %s\n",
+                               dld.shards[shardIdx].path, err.Error())
+               }
+               dld.lg.Infof("Read %s from shard %s.  Changing TotalShards to 
2, " +
+                       "LayoutVersion to 2\n", asJson(sinfo), 
dld.shards[shardIdx].path)
+               sinfo.TotalShards = 2
+               sinfo.LayoutVersion = 2
+               err = dld.shards[shardIdx].writeShardInfo(sinfo)
+               if err != nil {
+                       t.Fatalf("error writing shard info for shard %s: %s\n",
+                               dld.shards[0].path, err.Error())
                }
-               common.ExpectSpansEqual(t, allSpans[i], span)
-       }
-
-       // Set an old datastore version number.
-       for i := range ht.Store.shards {
-               shard := ht.Store.shards[i]
-               writeDataStoreVersion(ht.Store, shard.ldb, 
CURRENT_LAYOUT_VERSION-1)
-       }
-       ht.Close()
-       ht = nil
-
-       htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore3",
-               DataDirs: dataDirs, KeepDataDirsOnClose: true}
-       ht, err = htraceBld.Build()
-       if err == nil {
-               t.Fatalf("expected the datastore to fail to load after setting 
an " +
-                       "incorrect version.\n")
-       }
-       if !strings.Contains(err.Error(), "Invalid layout version") {
-               t.Fatal(`expected the loading error to contain "invalid layout 
version"` + "\n")
        }
+       dld.Close()
+       dld = nil
+       verifyFailedLoad(t, dataDirs, "The layout version of all shards is 2, " 
+
+               "but we only support")
 
        // It should work with data.store.clear set.
-       htraceBld = &MiniHTracedBuilder{Name: "TestReloadDataStore4",
-               DataDirs: dataDirs, KeepDataDirsOnClose: true,
-               Cnf: map[string]string{conf.HTRACE_DATA_STORE_CLEAR: "true"}}
+       htraceBld = &MiniHTracedBuilder{
+               Name: "TestReloadDataStore#clear",
+               DataDirs: dataDirs,
+               KeepDataDirsOnClose: true,
+               Cnf: map[string]string{conf.HTRACE_DATA_STORE_CLEAR: "true"},
+       }
        ht, err = htraceBld.Build()
        if err != nil {
-               t.Fatalf("expected the datastore loading to succeed after 
setting an "+
-                       "incorrect version.  But it failed with error %s\n", 
err.Error())
+               t.Fatalf("failed to create datastore: %s", err.Error())
        }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/b7f9058c/htrace-htraced/go/src/org/apache/htrace/htraced/loader.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/loader.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/loader.go
new file mode 100644
index 0000000..cb5ada7
--- /dev/null
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/loader.go
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "bytes"
+       "errors"
+       "fmt"
+       "github.com/jmhodges/levigo"
+       "github.com/ugorji/go/codec"
+       "io"
+       "math"
+       "math/rand"
+       "org/apache/htrace/common"
+       "org/apache/htrace/conf"
+       "os"
+       "strings"
+       "syscall"
+       "time"
+)
+
+// Routines for loading the datastore.
+
+// The leveldb key which has information about the shard.
+const SHARD_INFO_KEY = 'w'
+
+// A constant signifying that we don't know what the layout version is.
+const UNKNOWN_LAYOUT_VERSION = 0
+
+// The current layout version.  We cannot read layout versions newer than this.
+// We may sometimes be able to read older versions, but only by doing an
+// upgrade.
+const CURRENT_LAYOUT_VERSION = 3
+
+type DataStoreLoader struct {
+       // The dataStore logger.
+       lg *common.Logger
+
+       // True if we should clear the stored data.
+       ClearStored bool
+
+       // The shards that we're loading
+       shards []*ShardLoader
+
+       // The options to use for opening datastores in LevelDB.
+       openOpts *levigo.Options
+
+       // The read options to use for LevelDB.
+       readOpts *levigo.ReadOptions
+
+       // The write options to use for LevelDB.
+       writeOpts *levigo.WriteOptions
+}
+
+// Information about a Shard.
+type ShardInfo struct {
+       // The layout version of the datastore.
+       // We should always keep this field so that old software can recognize 
new
+       // layout versions, even if it can't read them.
+       LayoutVersion uint64
+
+       // A random number identifying this daemon.
+       DaemonId uint64
+
+       // The total number of shards in this datastore.
+       TotalShards uint32
+
+       // The index of this shard within the datastore.
+       ShardIndex uint32
+}
+
+// Create a new datastore loader. 
+// Initializes the loader, but does not load any leveldb instances.
+func NewDataStoreLoader(cnf *conf.Config) *DataStoreLoader {
+       dld := &DataStoreLoader{
+               lg: common.NewLogger("datastore", cnf),
+               ClearStored: cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR),
+       }
+       dld.readOpts = levigo.NewReadOptions()
+       dld.readOpts.SetFillCache(true)
+       dld.readOpts.SetVerifyChecksums(false)
+       dld.writeOpts = levigo.NewWriteOptions()
+       dld.writeOpts.SetSync(false)
+       dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES)
+       rdirs := strings.Split(dirsStr, conf.PATH_LIST_SEP)
+       // Filter out empty entries
+       dirs := make([]string, 0, len(rdirs))
+       for i := range(rdirs) {
+               if strings.TrimSpace(rdirs[i]) != "" {
+                       dirs = append(dirs, rdirs[i])
+               }
+       }
+       dld.shards = make([]*ShardLoader, len(dirs))
+       for i := range(dirs) {
+               dld.shards[i] = &ShardLoader{
+                       dld: dld,
+                       path: dirs[i] + conf.PATH_SEP + "db",
+               }
+       }
+       dld.openOpts = levigo.NewOptions()
+       dld.openOpts.SetParanoidChecks(false)
+       writeBufferSize := cnf.GetInt(conf.HTRACE_LEVELDB_WRITE_BUFFER_SIZE)
+       if writeBufferSize > 0 {
+               dld.openOpts.SetWriteBufferSize(writeBufferSize)
+       }
+       maxFdPerShard := dld.calculateMaxOpenFilesPerShard()
+       if maxFdPerShard > 0 {
+               dld.openOpts.SetMaxOpenFiles(maxFdPerShard)
+       }
+       return dld
+}
+
+func (dld *DataStoreLoader) Close() {
+       if dld.lg != nil {
+               dld.lg.Close()
+               dld.lg = nil
+       }
+       if dld.openOpts != nil {
+               dld.openOpts.Close()
+               dld.openOpts = nil
+       }
+       if dld.readOpts != nil {
+               dld.readOpts.Close()
+               dld.readOpts = nil
+       }
+       if dld.writeOpts != nil {
+               dld.writeOpts.Close()
+               dld.writeOpts = nil
+       }
+       if dld.shards != nil {
+               for i := range(dld.shards) {
+                       if dld.shards[i] != nil {
+                               dld.shards[i].Close()
+                       }
+               }
+               dld.shards = nil
+       }
+}
+
+func (dld *DataStoreLoader) DisownResources() {
+       dld.lg = nil
+       dld.openOpts = nil
+       dld.readOpts = nil
+       dld.writeOpts = nil
+       dld.shards = nil
+}
+
+// The maximum number of file descriptors we'll use on non-datastore things.
+const NON_DATASTORE_FD_MAX = 300
+
+// The minimum number of file descriptors per shard we will set.  Setting fewer
+// than this number could trigger a bug in some early versions of leveldb.
+const MIN_FDS_PER_SHARD = 80
+
+func (dld *DataStoreLoader) calculateMaxOpenFilesPerShard() int {
+       var rlim syscall.Rlimit
+       err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlim)
+       if err != nil {
+               dld.lg.Warnf("Unable to calculate maximum open files per shard: 
" +
+                       "getrlimit failed: %s\n", err.Error())
+               return 0
+       }
+       // I think RLIMIT_NOFILE fits in 32 bits on all known operating systems,
+       // but there's no harm in being careful.  'int' in golang always holds 
at
+       // least 32 bits.
+       var maxFd int
+       if rlim.Cur > uint64(math.MaxInt32) {
+               maxFd = math.MaxInt32
+       } else {
+               maxFd = int(rlim.Cur)
+       }
+       if len(dld.shards) == 0 {
+               dld.lg.Warnf("Unable to calculate maximum open files per shard, 
" +
+                       "since there are 0 shards configured.\n")
+               return 0
+       }
+       fdsPerShard := (maxFd - NON_DATASTORE_FD_MAX) / len(dld.shards)
+       if fdsPerShard < MIN_FDS_PER_SHARD {
+               dld.lg.Warnf("Expected to be able to use at least %d " +
+                       "fds per shard, but we have %d shards and %d total fds 
to allocate, " +
+                       "giving us only %d FDs per shard.", MIN_FDS_PER_SHARD,
+                       len(dld.shards), maxFd - NON_DATASTORE_FD_MAX, 
fdsPerShard)
+               return 0
+       }
+       dld.lg.Infof("maxFd = %d.  Setting maxFdPerShard = %d\n",
+               maxFd, fdsPerShard)
+       return fdsPerShard
+}
+
+// Load information about all shards.
+func (dld *DataStoreLoader) LoadShards() {
+       for i := range(dld.shards) {
+               shd := dld.shards[i]
+               shd.load()
+       }
+}
+
+// Verify that the shard infos are consistent.
+// Reorders the shardInfo structures based on their ShardIndex.
+func (dld *DataStoreLoader) VerifyShardInfos() error {
+       if len(dld.shards) < 1 {
+               return errors.New("No shard directories found.")
+       }
+       // Make sure no shards had errors.
+       for i := range(dld.shards) {
+               shd := dld.shards[i]
+               if shd.infoErr != nil {
+                       return shd.infoErr
+               }
+       }
+       // Make sure that if any shards are empty, all shards are empty.
+       emptyShards := ""
+       prefix := ""
+       for i := range(dld.shards) {
+               if dld.shards[i].info == nil {
+                       emptyShards = prefix + dld.shards[i].path
+                       prefix = ", "
+               }
+       }
+       if emptyShards != "" {
+               for i := range(dld.shards) {
+                       if dld.shards[i].info != nil {
+                               return errors.New(fmt.Sprintf("Shards %s were 
empty, but " +
+                                       "the other shards had data.", 
emptyShards))
+                       }
+               }
+               // All shards are empty.
+               return nil
+       }
+       // Make sure that all shards have the same layout version, daemonId, 
and number of total
+       // shards.
+       layoutVersion := dld.shards[0].info.LayoutVersion
+       daemonId := dld.shards[0].info.DaemonId
+       totalShards := dld.shards[0].info.TotalShards
+       for i := 1; i < len(dld.shards); i++ {
+               shd := dld.shards[i]
+               if layoutVersion != shd.info.LayoutVersion {
+                       return errors.New(fmt.Sprintf("Layout version mismatch. 
 Shard " +
+                               "%s has layout version 0x%016x, but shard %s 
has layout " +
+                               "version 0x%016x.",
+                               dld.shards[0].path, layoutVersion, shd.path, 
shd.info.LayoutVersion))
+               }
+               if daemonId != shd.info.DaemonId {
+                       return errors.New(fmt.Sprintf("DaemonId mismatch. Shard 
%s has " +
+                       "daemonId 0x%016x, but shard %s has daemonId 0x%016x.",
+                               dld.shards[0].path, daemonId, shd.path, 
shd.info.DaemonId))
+               }
+               if totalShards != shd.info.TotalShards {
+                       return errors.New(fmt.Sprintf("TotalShards mismatch.  
Shard %s has " +
+                               "TotalShards = %d, but shard %s has TotalShards 
= %d.",
+                               dld.shards[0].path, totalShards, shd.path, 
shd.info.TotalShards))
+               }
+               if shd.info.ShardIndex >= totalShards {
+                       return errors.New(fmt.Sprintf("Invalid ShardIndex.  
Shard %s has " +
+                               "ShardIndex = %d, but TotalShards = %d.",
+                               shd.path, shd.info.ShardIndex, 
shd.info.TotalShards))
+               }
+       }
+       if layoutVersion != CURRENT_LAYOUT_VERSION {
+               return errors.New(fmt.Sprintf("The layout version of all shards 
" +
+                       "is %d, but we only support version %d.",
+                       layoutVersion, CURRENT_LAYOUT_VERSION))
+       }
+       if totalShards != uint32(len(dld.shards)) {
+               return errors.New(fmt.Sprintf("The TotalShards field of all 
shards " +
+                       "is %d, but we have %d shards.", totalShards, 
len(dld.shards)))
+       }
+       // Reorder shards in order of their ShardIndex.
+       reorderedShards := make([]*ShardLoader, len(dld.shards))
+       for i := 0; i < len(dld.shards); i++ {
+               shd := dld.shards[i]
+               shardIdx := shd.info.ShardIndex
+               if reorderedShards[shardIdx] != nil {
+                       return errors.New(fmt.Sprintf("Both shard %s and " +
+                               "shard %s have ShardIndex %d.", shd.path,
+                               reorderedShards[shardIdx].path, shardIdx))
+               }
+               reorderedShards[shardIdx] = shd
+       }
+       dld.shards = reorderedShards
+       return nil
+}
+
+func (dld *DataStoreLoader) Load() error {
+       var err error
+       // If data.store.clear was set, clear existing data.
+       if dld.ClearStored {
+               err = dld.clearStored()
+               if err != nil {
+                       return err
+               }
+       }
+       // Make sure the shard directories exist in all cases, with a mkdir -p 
+       for i := range dld.shards {
+               err := os.MkdirAll(dld.shards[i].path, 0777)
+               if err != nil {
+                       return errors.New(fmt.Sprintf("Failed to MkdirAll(%s): 
%s",
+                               dld.shards[i].path, err.Error()))
+               }
+       }
+       // Get information about each shard, and verify them.
+       dld.LoadShards()
+       err = dld.VerifyShardInfos()
+       if err != nil {
+               return err
+       }
+       if dld.shards[0].ldb != nil {
+               dld.lg.Infof("Loaded %d leveldb instances with " +
+                       "DaemonId of 0x%016x\n", len(dld.shards),
+                       dld.shards[0].info.DaemonId)
+       } else {
+               // Create leveldb instances if needed.
+               rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
+               daemonId := uint64(rnd.Int63())
+               dld.lg.Infof("Initializing %d leveldb instances with a new " +
+                       "DaemonId of 0x%016x\n", len(dld.shards), daemonId)
+               dld.openOpts.SetCreateIfMissing(true)
+               for i := range(dld.shards) {
+                       shd := dld.shards[i]
+                       shd.ldb, err = levigo.Open(shd.path, shd.dld.openOpts)
+                       if err != nil {
+                               return errors.New(fmt.Sprintf("levigo.Open(%s) 
failed to " +
+                                       "create the shard: %s", shd.path, 
err.Error()))
+                       }
+                       info := &ShardInfo {
+                               LayoutVersion: CURRENT_LAYOUT_VERSION,
+                               DaemonId: daemonId,
+                               TotalShards: uint32(len(dld.shards)),
+                               ShardIndex: uint32(i),
+                       }
+                       err = shd.writeShardInfo(info)
+                       if err != nil {
+                               return errors.New(fmt.Sprintf("levigo.Open(%s) 
failed to " +
+                                       "write shard info: %s", shd.path, 
err.Error()))
+                       }
+                       dld.lg.Infof("Shard %s initialized with ShardInfo %s 
\n",
+                               shd.path, asJson(info))
+               }
+       }
+       return nil
+}
+
+func (dld *DataStoreLoader) clearStored() error {
+       for i := range dld.shards {
+               path := dld.shards[i].path
+               fi, err := os.Stat(path)
+               if err != nil && !os.IsNotExist(err) {
+                       dld.lg.Errorf("Failed to stat %s: %s\n", path, 
err.Error())
+                       return err
+               }
+               if fi != nil {
+                       err = os.RemoveAll(path)
+                       if err != nil {
+                               dld.lg.Errorf("Failed to clear existing 
datastore directory %s: %s\n",
+                                       path, err.Error())
+                               return err
+                       }
+                       dld.lg.Infof("Cleared existing datastore directory 
%s\n", path)
+               }
+       }
+       return nil
+}
+
+type ShardLoader struct {
+       // The parent DataStoreLoader
+       dld *DataStoreLoader
+
+       // Path to the shard
+       path string
+
+       // Leveldb instance of the shard
+       ldb *levigo.DB
+
+       // Information about the shard
+       info *ShardInfo
+
+       // If non-null, the error we encountered trying to load the shard info.
+       infoErr error
+}
+
+func (shd *ShardLoader) Close() {
+       if shd.ldb != nil {
+               shd.ldb.Close()
+               shd.ldb = nil
+       }
+}
+
+// Load information about a particular shard.
+func (shd *ShardLoader) load() {
+       shd.info = nil
+       fi, err := os.Stat(shd.path)
+       if err != nil {
+               if os.IsNotExist(err) {
+                       shd.infoErr = nil
+                       return
+               }
+               shd.infoErr = errors.New(fmt.Sprintf(
+                       "stat() error on leveldb directory " +
+                               "%s: %s", shd.path, err.Error()))
+               return
+       }
+       if !fi.Mode().IsDir() {
+               shd.infoErr = errors.New(fmt.Sprintf(
+                       "stat() error on leveldb directory " +
+                               "%s: inode is not directory.", shd.path))
+               return
+       }
+       var dbDir *os.File
+       dbDir, err = os.Open(shd.path)
+       if err != nil {
+               shd.infoErr = errors.New(fmt.Sprintf(
+                       "open() error on leveldb directory " +
+                               "%s: %s.", shd.path, err.Error()))
+               return
+       }
+       defer func() {
+               if dbDir != nil {
+                       dbDir.Close()
+               }
+       }()
+       _, err = dbDir.Readdirnames(1)
+       if err != nil {
+               if err == io.EOF {
+                       // The db directory is empty.
+                       shd.infoErr = nil
+                       return
+               }
+               shd.infoErr = errors.New(fmt.Sprintf(
+                       "Readdirnames() error on leveldb directory " +
+                               "%s: %s.", shd.path, err.Error()))
+               return
+       }
+       dbDir.Close()
+       dbDir = nil
+       shd.ldb, err = levigo.Open(shd.path, shd.dld.openOpts)
+       if err != nil {
+               shd.ldb = nil
+               shd.infoErr = errors.New(fmt.Sprintf(
+                       "levigo.Open() error on leveldb directory " +
+                               "%s: %s.", shd.path, err.Error()))
+               return
+       }
+       shd.info, err = shd.readShardInfo()
+       if err != nil {
+               shd.infoErr = err
+               return
+       }
+       shd.infoErr = nil
+}
+
+func (shd *ShardLoader) readShardInfo() (*ShardInfo, error) {
+       buf, err := shd.ldb.Get(shd.dld.readOpts, []byte{SHARD_INFO_KEY})
+       if err != nil {
+               return nil, errors.New(fmt.Sprintf("readShardInfo(%s): failed 
to " +
+                       "read shard info key: %s", shd.path, err.Error()))
+       }
+       if len(buf) == 0 {
+               return nil, errors.New(fmt.Sprintf("readShardInfo(%s): got 
zero-" +
+                       "length value for shard info key.", shd.path))
+       }
+       mh := new(codec.MsgpackHandle)
+       mh.WriteExt = true
+       r := bytes.NewBuffer(buf)
+       decoder := codec.NewDecoder(r, mh)
+       shardInfo := &ShardInfo {
+               LayoutVersion: UNKNOWN_LAYOUT_VERSION,
+       }
+       err = decoder.Decode(shardInfo)
+       if err != nil {
+               return nil, errors.New(fmt.Sprintf("readShardInfo(%s): msgpack 
" +
+                       "decoding failed for shard info key: %s", shd.path, 
err.Error()))
+       }
+       return shardInfo, nil
+}
+
+func (shd *ShardLoader) writeShardInfo(info *ShardInfo) error {
+       mh := new(codec.MsgpackHandle)
+       mh.WriteExt = true
+       w := new(bytes.Buffer)
+       enc := codec.NewEncoder(w, mh)
+       err := enc.Encode(info)
+       if err != nil {
+               return errors.New(fmt.Sprintf("msgpack encoding error: %s",
+                       err.Error()))
+       }
+       err = shd.ldb.Put(shd.dld.writeOpts, []byte{SHARD_INFO_KEY}, w.Bytes())
+       if err != nil {
+               return errors.New(fmt.Sprintf("leveldb write error: %s",
+                       err.Error()))
+       }
+       return nil
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/b7f9058c/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
index 667a17b..cf7ef67 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
@@ -115,8 +115,10 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, 
error) {
                                store.Close()
                        }
                        for idx := range bld.DataDirs {
-                               if bld.DataDirs[idx] != "" {
-                                       os.RemoveAll(bld.DataDirs[idx])
+                               if !bld.KeepDataDirsOnClose {
+                                       if bld.DataDirs[idx] != "" {
+                                               os.RemoveAll(bld.DataDirs[idx])
+                                       }
                                }
                        }
                        if rsv != nil {

Reply via email to