Repository: incubator-htrace
Updated Branches:
  refs/heads/master fc0d8f38f -> c715e12eb


HTRACE-302. htraced: Add admissions control to HRPC to limit the number of 
incoming messages (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/c715e12e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/c715e12e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/c715e12e

Branch: refs/heads/master
Commit: c715e12eb085cf551e90567f80c78886a3cc07f6
Parents: fc0d8f3
Author: Colin P. Mccabe <[email protected]>
Authored: Thu Nov 19 16:45:42 2015 -0800
Committer: Colin P. Mccabe <[email protected]>
Committed: Thu Nov 19 16:52:56 2015 -0800

----------------------------------------------------------------------
 .../go/src/org/apache/htrace/client/client.go   |  28 ++-
 .../go/src/org/apache/htrace/client/hclient.go  |  11 +-
 .../go/src/org/apache/htrace/common/log.go      |   4 +
 .../src/org/apache/htrace/conf/config_keys.go   |  12 ++
 .../org/apache/htrace/htraced/client_test.go    | 133 ++++++++++++-
 .../src/org/apache/htrace/htraced/datastore.go  |   1 +
 .../org/apache/htrace/htraced/datastore_test.go |   4 +-
 .../go/src/org/apache/htrace/htraced/hrpc.go    | 193 ++++++++++++++-----
 .../go/src/org/apache/htrace/htraced/htraced.go |   2 +-
 .../org/apache/htrace/htraced/metrics_test.go   |   7 +-
 .../org/apache/htrace/htraced/mini_htraced.go   |   6 +-
 .../go/src/org/apache/htrace/htracedTool/cmd.go |   2 +-
 12 files changed, 333 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/client/client.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/client/client.go 
b/htrace-htraced/go/src/org/apache/htrace/client/client.go
index 1140209..0028545 100644
--- a/htrace-htraced/go/src/org/apache/htrace/client/client.go
+++ b/htrace-htraced/go/src/org/apache/htrace/client/client.go
@@ -33,25 +33,35 @@ import (
 
 // A golang client for htraced.
 // TODO: fancier APIs for streaming spans in the background, optimize TCP stuff
-
-func NewClient(cnf *conf.Config) (*Client, error) {
-       hcl := Client{}
+func NewClient(cnf *conf.Config, testHooks *TestHooks) (*Client, error) {
+       hcl := Client{testHooks: testHooks}
        hcl.restAddr = cnf.Get(conf.HTRACE_WEB_ADDRESS)
-       hcl.hrpcAddr = cnf.Get(conf.HTRACE_HRPC_ADDRESS)
+       if testHooks != nil && testHooks.HrpcDisabled {
+               hcl.hrpcAddr = ""
+       } else {
+               hcl.hrpcAddr = cnf.Get(conf.HTRACE_HRPC_ADDRESS)
+       }
        return &hcl, nil
 }
 
+type TestHooks struct {
+       // If true, HRPC is disabled.
+       HrpcDisabled bool
+
+       // A function which gets called after we connect to the server and send 
the
+       // message frame, but before we write the message body.
+       HandleWriteRequestBody func()
+}
+
 type Client struct {
        // REST address of the htraced server.
        restAddr string
 
        // HRPC address of the htraced server.
        hrpcAddr string
-}
 
-// Disable HRPC
-func (hcl *Client) DisableHrpc() {
-       hcl.hrpcAddr = ""
+       // The test hooks to use, or nil if test hooks are not enabled.
+       testHooks *TestHooks
 }
 
 // Get the htraced server version information.
@@ -136,7 +146,7 @@ func (hcl *Client) WriteSpans(req *common.WriteSpansReq) 
error {
        if hcl.hrpcAddr == "" {
                return hcl.writeSpansHttp(req)
        }
-       hcr, err := newHClient(hcl.hrpcAddr)
+       hcr, err := newHClient(hcl.hrpcAddr, hcl.testHooks)
        if err != nil {
                return err
        }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/client/hclient.go 
b/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
index 608dd59..ef79deb 100644
--- a/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
+++ b/htrace-htraced/go/src/org/apache/htrace/client/hclient.go
@@ -38,6 +38,7 @@ type hClient struct {
 type HrpcClientCodec struct {
        rwc    io.ReadWriteCloser
        length uint32
+       testHooks *TestHooks
 }
 
 func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) 
error {
@@ -72,6 +73,9 @@ func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, 
msg interface{}) erro
                return errors.New(fmt.Sprintf("Error writing header bytes: %s",
                        err.Error()))
        }
+       if cdc.testHooks != nil && cdc.testHooks.HandleWriteRequestBody != nil {
+               cdc.testHooks.HandleWriteRequestBody()
+       }
        _, err = cdc.rwc.Write(buf)
        if err != nil {
                return errors.New(fmt.Sprintf("Error writing body bytes: %s",
@@ -136,14 +140,17 @@ func (cdc *HrpcClientCodec) Close() error {
        return cdc.rwc.Close()
 }
 
-func newHClient(hrpcAddr string) (*hClient, error) {
+func newHClient(hrpcAddr string, testHooks *TestHooks) (*hClient, error) {
        hcr := hClient{}
        conn, err := net.Dial("tcp", hrpcAddr)
        if err != nil {
                return nil, errors.New(fmt.Sprintf("Error contacting the HRPC 
server "+
                        "at %s: %s", hrpcAddr, err.Error()))
        }
-       hcr.rpcClient = rpc.NewClientWithCodec(&HrpcClientCodec{rwc: conn})
+       hcr.rpcClient = rpc.NewClientWithCodec(&HrpcClientCodec{
+               rwc: conn,
+               testHooks: testHooks,
+       })
        return &hcr, nil
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/common/log.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/log.go 
b/htrace-htraced/go/src/org/apache/htrace/common/log.go
index 4066094..8a26507 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/log.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/log.go
@@ -291,6 +291,10 @@ func (lg *Logger) ErrorEnabled() bool {
        return lg.Level <= ERROR
 }
 
+func (lg *Logger) LevelEnabled(level Level) bool {
+       return lg.Level <= level
+}
+
 func (lg *Logger) Close() {
        lg.sink.Unref()
        lg.sink = nil

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go 
b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
index d10f3af..511833c 100644
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
+++ b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go
@@ -86,6 +86,16 @@ const HTRACE_REAPER_HEARTBEAT_PERIOD_MS = 
"reaper.heartbeat.period.ms"
 // started.
 const HTRACE_STARTUP_NOTIFICATION_ADDRESS = "startup.notification.address"
 
+// The maximum number of HRPC handler goroutines we will create at once.  If
+// this is too small, we won't get enough concurrency; if it's too big, we will
+// buffer too much data in memory while waiting for the datastore to process
+// requests.
+const HTRACE_NUM_HRPC_HANDLERS = "num.hrpc.handlers"
+
+// The I/O timeout HRPC will use, in milliseconds.  If it takes longer than
+// this to read or write a message, we will abort the connection.
+const HTRACE_HRPC_IO_TIMEOUT_MS = "hrpc.io.timeout.ms"
+
 // Default values for HTrace configuration keys.
 var DEFAULTS = map[string]string{
        HTRACE_WEB_ADDRESS:  fmt.Sprintf("0.0.0.0:%d", 
HTRACE_WEB_ADDRESS_DEFAULT_PORT),
@@ -100,6 +110,8 @@ var DEFAULTS = map[string]string{
        HTRACE_METRICS_MAX_ADDR_ENTRIES:    "100000",
        HTRACE_SPAN_EXPIRY_MS:              "0",
        HTRACE_REAPER_HEARTBEAT_PERIOD_MS:  fmt.Sprintf("%d", 90*1000),
+       HTRACE_NUM_HRPC_HANDLERS:           "20",
+       HTRACE_HRPC_IO_TIMEOUT_MS:          "60000",
 }
 
 // Values to be used when creating test configurations

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
index 9a51cd4..e4f2151 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go
@@ -22,12 +22,15 @@ package main
 import (
        "fmt"
        "math/rand"
-       htrace "org/apache/htrace/client"
        "org/apache/htrace/common"
+       "org/apache/htrace/conf"
        "org/apache/htrace/test"
        "sort"
        "testing"
        "time"
+       "sync"
+       "sync/atomic"
+       htrace "org/apache/htrace/client"
 )
 
 func TestClientGetServerVersion(t *testing.T) {
@@ -39,7 +42,7 @@ func TestClientGetServerVersion(t *testing.T) {
        }
        defer ht.Close()
        var hcl *htrace.Client
-       hcl, err = htrace.NewClient(ht.ClientConf())
+       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
        if err != nil {
                t.Fatalf("failed to create client: %s", err.Error())
        }
@@ -58,7 +61,7 @@ func TestClientGetServerDebugInfo(t *testing.T) {
        }
        defer ht.Close()
        var hcl *htrace.Client
-       hcl, err = htrace.NewClient(ht.ClientConf())
+       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
        if err != nil {
                t.Fatalf("failed to create client: %s", err.Error())
        }
@@ -95,7 +98,7 @@ func TestClientOperations(t *testing.T) {
        }
        defer ht.Close()
        var hcl *htrace.Client
-       hcl, err = htrace.NewClient(ht.ClientConf())
+       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
        if err != nil {
                t.Fatalf("failed to create client: %s", err.Error())
        }
@@ -185,7 +188,7 @@ func TestDumpAll(t *testing.T) {
        }
        defer ht.Close()
        var hcl *htrace.Client
-       hcl, err = htrace.NewClient(ht.ClientConf())
+       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
        if err != nil {
                t.Fatalf("failed to create client: %s", err.Error())
        }
@@ -246,7 +249,7 @@ func TestClientGetServerConf(t *testing.T) {
        }
        defer ht.Close()
        var hcl *htrace.Client
-       hcl, err = htrace.NewClient(ht.ClientConf())
+       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
        if err != nil {
                t.Fatalf("failed to create client: %s", err.Error())
        }
@@ -259,3 +262,121 @@ func TestClientGetServerConf(t *testing.T) {
                        EXAMPLE_CONF_KEY, EXAMPLE_CONF_VALUE)
        }
 }
+
+const TEST_NUM_HRPC_HANDLERS = 2
+
+const TEST_NUM_WRITESPANS = 4
+
+// Tests that HRPC limits the number of simultaneous connections being 
processed.
+func TestHrpcAdmissionsControl(t *testing.T) {
+       var wg sync.WaitGroup
+       wg.Add(TEST_NUM_WRITESPANS)
+       var numConcurrentHrpcCalls int32
+       testHooks := &hrpcTestHooks {
+               HandleAdmission: func() {
+                       defer wg.Done()
+                       n := atomic.AddInt32(&numConcurrentHrpcCalls, 1)
+                       if n > TEST_NUM_HRPC_HANDLERS {
+                               t.Fatalf("The number of concurrent HRPC calls 
went above " +
+                                       "%d: it's at %d\n", 
TEST_NUM_HRPC_HANDLERS, n)
+                       }
+                       time.Sleep(1 * time.Millisecond)
+                       n = atomic.AddInt32(&numConcurrentHrpcCalls, -1)
+                       if n >= TEST_NUM_HRPC_HANDLERS {
+                               t.Fatalf("The number of concurrent HRPC calls 
went above " +
+                                       "%d: it was at %d\n", 
TEST_NUM_HRPC_HANDLERS, n + 1)
+                       }
+               },
+       }
+       htraceBld := &MiniHTracedBuilder{Name: "TestHrpcAdmissionsControl",
+               DataDirs: make([]string, 2),
+               Cnf: map[string]string{
+                       conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", 
TEST_NUM_HRPC_HANDLERS),
+               },
+               WrittenSpans: make(chan *common.Span, TEST_NUM_WRITESPANS),
+               HrpcTestHooks: testHooks,
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       defer ht.Close()
+       var hcl *htrace.Client
+       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
+       if err != nil {
+               t.Fatalf("failed to create client: %s", err.Error())
+       }
+       // Create some random trace spans.
+       allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS)
+       for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ {
+               go func(i int) {
+                       err = hcl.WriteSpans(&common.WriteSpansReq{
+                               Spans: allSpans[i:i+1],
+                       })
+                       if err != nil {
+                               t.Fatalf("WriteSpans failed: %s\n", err.Error())
+                       }
+               }(iter)
+       }
+       wg.Wait()
+       for i := 0; i < TEST_NUM_WRITESPANS; i++ {
+               <-ht.Store.WrittenSpans
+       }
+}
+
+// Tests that HRPC I/O timeouts work.
+func TestHrpcIoTimeout(t *testing.T) {
+       htraceBld := &MiniHTracedBuilder{Name: "TestHrpcIoTimeout",
+               DataDirs: make([]string, 2),
+               Cnf: map[string]string{
+                       conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", 
TEST_NUM_HRPC_HANDLERS),
+                       conf.HTRACE_HRPC_IO_TIMEOUT_MS: "1",
+               },
+       }
+       ht, err := htraceBld.Build()
+       if err != nil {
+               t.Fatalf("failed to create datastore: %s", err.Error())
+       }
+       defer ht.Close()
+       var hcl *htrace.Client
+       finishClient := make(chan interface{})
+       defer func() {
+               // Close the finishClient channel, if it hasn't already been 
closed. 
+               defer func() {recover()}()
+               close(finishClient)
+       }()
+       testHooks := &htrace.TestHooks {
+               HandleWriteRequestBody: func() {
+                       <-finishClient
+               },
+       }
+       hcl, err = htrace.NewClient(ht.ClientConf(), testHooks)
+       if err != nil {
+               t.Fatalf("failed to create client: %s", err.Error())
+       }
+       // Create some random trace spans.
+       allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS)
+       var wg sync.WaitGroup
+       wg.Add(TEST_NUM_WRITESPANS)
+       for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ {
+               go func(i int) {
+                       defer wg.Done()
+                       // Ignore the error return because there are internal 
retries in
+                       // the client which will make this succeed eventually, 
usually.
+                       // Keep in mind that we only block until we have seen
+                       // TEST_NUM_WRITESPANS I/O errors in the HRPC server-- 
after that,
+                       // we let requests through so that the test can exit 
cleanly.
+                       hcl.WriteSpans(&common.WriteSpansReq{
+                               Spans: allSpans[i:i+1],
+                       })
+               }(iter)
+       }
+       for {
+               if ht.Hsv.GetNumIoErrors() >= TEST_NUM_WRITESPANS {
+                       break
+               }
+               time.Sleep(1000 * time.Nanosecond)
+       }
+       close(finishClient)
+       wg.Wait()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index ab2747b..a4bb320 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -289,6 +289,7 @@ func (shd *shard) writeSpan(ispan *IncomingSpan) error {
        }
        shd.mtxMap.IncrementWritten(ispan.Addr, shd.maxMtx, shd.store.lg)
        if shd.store.WrittenSpans != nil {
+               shd.store.lg.Errorf("WATERMELON: Sending span to 
shd.store.WrittenSpans\n")
                shd.store.WrittenSpans <- span
        }
        return nil

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
index 576ee0b..0443834 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -410,7 +410,7 @@ func TestReloadDataStore(t *testing.T) {
                }
        }()
        var hcl *htrace.Client
-       hcl, err = htrace.NewClient(ht.ClientConf())
+       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
        if err != nil {
                t.Fatalf("failed to create client: %s", err.Error())
        }
@@ -444,7 +444,7 @@ func TestReloadDataStore(t *testing.T) {
        if err != nil {
                t.Fatalf("failed to re-create datastore: %s", err.Error())
        }
-       hcl, err = htrace.NewClient(ht.ClientConf())
+       hcl, err = htrace.NewClient(ht.ClientConf(), nil)
        if err != nil {
                t.Fatalf("failed to re-create client: %s", err.Error())
        }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
index 0d72602..a0f2e81 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go
@@ -33,9 +33,13 @@ import (
        "org/apache/htrace/common"
        "org/apache/htrace/conf"
        "reflect"
+       "sync"
+       "sync/atomic"
        "time"
 )
 
+const MAX_HRPC_HANDLERS = 32765
+
 // Handles HRPC calls
 type HrpcHandler struct {
        lg    *common.Logger
@@ -46,14 +50,57 @@ type HrpcHandler struct {
 type HrpcServer struct {
        *rpc.Server
        hand     *HrpcHandler
+
+       // The listener we are using to accept new connections.
        listener net.Listener
+
+       // A WaitGroup used to block until the HRPC server has exited.
+       exited   sync.WaitGroup
+
+       // A channel containing server codecs to use.  This channel is fully
+       // buffered.  The number of entries it initially contains determines how
+       // many concurrent codecs we will have running at once.
+       cdcs     chan *HrpcServerCodec
+
+       // Used to shut down
+       shutdown chan interface{}
+
+       // The I/O timeout to use when reading requests or sending responses.  
This
+       // timeout does not apply to the time we spend processing the message.
+       ioTimeo    time.Duration
+
+       // A count of all I/O errors that we have encountered since the server
+       // started.  This counts errors like improperly formatted message 
frames,
+       // but not errors like properly formatted but invalid messages.
+       // This count is updated from multiple goroutines via sync/atomic.
+       ioErrorCount  uint64
+
+       // The test hooks to use, or nil during normal operation.
+       testHooks *hrpcTestHooks
+}
+
+type hrpcTestHooks struct {
+       // A callback we make right after calling Accept() but before reading 
from
+       // the new connection.
+       HandleAdmission func()
 }
 
-// Codec which encodes HRPC data via JSON
+// A codec which encodes HRPC data via JSON.  This structure holds the context
+// for a particular client connection.
 type HrpcServerCodec struct {
        lg     *common.Logger
+
+       // The current connection.
        conn   net.Conn
+
+       // The HrpcServer which this connection is part of.
+       hsv    *HrpcServer
+
+       // The message length we read from the header.
        length uint32
+
+       // The number of messages this connection has handled.
+       numHandled int
 }
 
 func asJson(val interface{}) string {
@@ -64,45 +111,51 @@ func asJson(val interface{}) string {
        return string(js)
 }
 
-func createErrAndWarn(lg *common.Logger, val string) error {
-       return createErrAndLog(lg, val, common.WARN)
+func newIoErrorWarn(cdc *HrpcServerCodec, val string) error {
+       return newIoError(cdc, val, common.WARN)
 }
 
-func createErrAndLog(lg *common.Logger, val string, level common.Level) error {
-       lg.Write(level, val+"\n")
+func newIoError(cdc *HrpcServerCodec, val string, level common.Level) error {
+       if cdc.lg.LevelEnabled(level) {
+               cdc.lg.Write(level, cdc.conn.RemoteAddr().String() + ": " + val 
+ "\n")
+       }
+       if level >= common.INFO {
+               atomic.AddUint64(&cdc.hsv.ioErrorCount, 1)
+       }
        return errors.New(val)
 }
 
 func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
        hdr := common.HrpcRequestHeader{}
        if cdc.lg.TraceEnabled() {
-               cdc.lg.Tracef("Reading HRPC request header from %s\n", 
cdc.conn.RemoteAddr())
+               cdc.lg.Tracef("%s: Reading HRPC request header.\n", 
cdc.conn.RemoteAddr())
        }
+       cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo))
        err := binary.Read(cdc.conn, binary.LittleEndian, &hdr)
        if err != nil {
-               level := common.WARN
-               if err == io.EOF {
-                       level = common.DEBUG
+               if err == io.EOF && cdc.numHandled > 0 {
+                       return newIoError(cdc, fmt.Sprintf("Remote closed 
connection " +
+                               "after writing %d message(s)", cdc.numHandled), 
common.DEBUG)
                }
-               return createErrAndLog(cdc.lg, fmt.Sprintf("Error reading 
header bytes: %s",
-                       err.Error()), level)
+               return newIoError(cdc,
+                       fmt.Sprintf("Error reading request header: %s", 
err.Error()), common.WARN)
        }
        if cdc.lg.TraceEnabled() {
-               cdc.lg.Tracef("Read HRPC request header %s from %s\n",
-                       asJson(&hdr), cdc.conn.RemoteAddr())
+               cdc.lg.Tracef("%s: Read HRPC request header %s\n",
+                       cdc.conn.RemoteAddr(), asJson(&hdr))
        }
        if hdr.Magic != common.HRPC_MAGIC {
-               return createErrAndWarn(cdc.lg, fmt.Sprintf("Invalid request 
header: expected "+
+               return newIoErrorWarn(cdc, fmt.Sprintf("Invalid request header: 
expected "+
                        "magic number of 0x%04x, but got 0x%04x", 
common.HRPC_MAGIC, hdr.Magic))
        }
        if hdr.Length > common.MAX_HRPC_BODY_LENGTH {
-               return createErrAndWarn(cdc.lg, fmt.Sprintf("Length prefix was 
too long.  "+
+               return newIoErrorWarn(cdc, fmt.Sprintf("Length prefix was too 
long.  "+
                        "Maximum length is %d, but we got %d.", 
common.MAX_HRPC_BODY_LENGTH,
                        hdr.Length))
        }
        req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId)
        if req.ServiceMethod == "" {
-               return createErrAndWarn(cdc.lg, fmt.Sprintf("Unknown MethodID 
code 0x%04x",
+               return newIoErrorWarn(cdc, fmt.Sprintf("Unknown MethodID code 
0x%04x",
                        hdr.MethodId))
        }
        req.Seq = hdr.Seq
@@ -111,34 +164,36 @@ func (cdc *HrpcServerCodec) ReadRequestHeader(req 
*rpc.Request) error {
 }
 
 func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
-       remoteAddr := cdc.conn.RemoteAddr()
        if cdc.lg.TraceEnabled() {
-               cdc.lg.Tracef("Reading HRPC %d-byte request body from %s\n",
-                       cdc.length, remoteAddr)
+               cdc.lg.Tracef("%s: Reading HRPC %d-byte request body.\n",
+                       cdc.conn.RemoteAddr(), cdc.length)
        }
        mh := new(codec.MsgpackHandle)
        mh.WriteExt = true
        dec := codec.NewDecoder(io.LimitReader(cdc.conn, int64(cdc.length)), mh)
        err := dec.Decode(body)
        if err != nil {
-               return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to read 
request "+
-                       "body from %s: %s", remoteAddr, err.Error()))
+               return newIoErrorWarn(cdc, fmt.Sprintf("Failed to read %d-byte 
" +
+                       "request body: %s", cdc.length, err.Error()))
        }
        if cdc.lg.TraceEnabled() {
-               cdc.lg.Tracef("Read body from %s: %s\n",
-                       remoteAddr, asJson(&body))
+               cdc.lg.Tracef("%s: read %d-byte request body %s\n",
+                       cdc.conn.RemoteAddr(), cdc.length, asJson(&body))
        }
        val := reflect.ValueOf(body)
        addr := val.Elem().FieldByName("Addr")
        if addr.IsValid() {
-               addr.SetString(remoteAddr.String())
+               addr.SetString(cdc.conn.RemoteAddr().String())
        }
+       var zeroTime time.Time
+       cdc.conn.SetDeadline(zeroTime)
        return nil
 }
 
 var EMPTY []byte = make([]byte, 0)
 
 func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) 
error {
+       cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo))
        var err error
        buf := EMPTY
        if msg != nil {
@@ -148,7 +203,7 @@ func (cdc *HrpcServerCodec) WriteResponse(resp 
*rpc.Response, msg interface{}) e
                enc := codec.NewEncoder(w, mh)
                err := enc.Encode(msg)
                if err != nil {
-                       return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to 
marshal "+
+                       return newIoErrorWarn(cdc, fmt.Sprintf("Failed to 
marshal "+
                                "response message: %s", err.Error()))
                }
                buf = w.Bytes()
@@ -161,13 +216,13 @@ func (cdc *HrpcServerCodec) WriteResponse(resp 
*rpc.Response, msg interface{}) e
        writer := bufio.NewWriterSize(cdc.conn, 256)
        err = binary.Write(writer, binary.LittleEndian, &hdr)
        if err != nil {
-               return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write 
response "+
+               return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write 
response "+
                        "header: %s", err.Error()))
        }
        if hdr.ErrLength > 0 {
                _, err = io.WriteString(writer, resp.Error)
                if err != nil {
-                       return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to 
write error "+
+                       return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write 
error "+
                                "string: %s", err.Error()))
                }
        }
@@ -175,24 +230,30 @@ func (cdc *HrpcServerCodec) WriteResponse(resp 
*rpc.Response, msg interface{}) e
                var length int
                length, err = writer.Write(buf)
                if err != nil {
-                       return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to 
write response "+
+                       return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write 
response "+
                                "message: %s", err.Error()))
                }
                if uint32(length) != hdr.Length {
-                       return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to 
write all of "+
+                       return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write 
all of "+
                                "response message: %s", err.Error()))
                }
        }
        err = writer.Flush()
        if err != nil {
-               return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write 
the response "+
+               return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write the 
response "+
                        "bytes: %s", err.Error()))
        }
+       cdc.numHandled++
        return nil
 }
 
 func (cdc *HrpcServerCodec) Close() error {
-       return cdc.conn.Close()
+       err := cdc.conn.Close()
+       cdc.conn = nil
+       cdc.length = 0
+       cdc.numHandled = 0
+       cdc.hsv.cdcs <- cdc
+       return err
 }
 
 func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
@@ -228,14 +289,36 @@ func (hand *HrpcHandler) WriteSpans(req 
*common.WriteSpansReq,
        return nil
 }
 
-func CreateHrpcServer(cnf *conf.Config, store *dataStore) (*HrpcServer, error) 
{
+func CreateHrpcServer(cnf *conf.Config, store *dataStore,
+               testHooks *hrpcTestHooks) (*HrpcServer, error) {
        lg := common.NewLogger("hrpc", cnf)
+       numHandlers := cnf.GetInt(conf.HTRACE_NUM_HRPC_HANDLERS)
+       if numHandlers < 1 {
+               lg.Warnf("%s must be positive: using 1 handler.\n", 
conf.HTRACE_NUM_HRPC_HANDLERS)
+               numHandlers = 1
+       }
+       if numHandlers > MAX_HRPC_HANDLERS {
+               lg.Warnf("%s cannot be more than %d: using %d handlers\n",
+                       conf.HTRACE_NUM_HRPC_HANDLERS, MAX_HRPC_HANDLERS, 
MAX_HRPC_HANDLERS)
+               numHandlers = MAX_HRPC_HANDLERS
+       }
        hsv := &HrpcServer{
                Server: rpc.NewServer(),
                hand: &HrpcHandler{
                        lg:    lg,
                        store: store,
                },
+               cdcs: make(chan *HrpcServerCodec, numHandlers),
+               shutdown: make(chan interface{}),
+               ioTimeo: time.Millisecond *
+                       
time.Duration(cnf.GetInt64(conf.HTRACE_HRPC_IO_TIMEOUT_MS)),
+               testHooks: testHooks,
+       }
+       for i := 0; i < numHandlers; i++ {
+               hsv.cdcs <- &HrpcServerCodec{
+                       lg:   lg,
+                       hsv:  hsv,
+               }
        }
        var err error
        hsv.listener, err = net.Listen("tcp", cnf.Get(conf.HTRACE_HRPC_ADDRESS))
@@ -243,26 +326,42 @@ func CreateHrpcServer(cnf *conf.Config, store *dataStore) 
(*HrpcServer, error) {
                return nil, err
        }
        hsv.Server.Register(hsv.hand)
+       hsv.exited.Add(1)
        go hsv.run()
-       lg.Infof("Started HRPC server on %s...\n", hsv.listener.Addr().String())
+       lg.Infof("Started HRPC server on %s with %d handler routines. " +
+               "ioTimeo=%s.\n", hsv.listener.Addr().String(), numHandlers,
+               hsv.ioTimeo.String())
        return hsv, nil
 }
 
 func (hsv *HrpcServer) run() {
        lg := hsv.hand.lg
+       srvAddr := hsv.listener.Addr().String()
+       defer func() {
+               lg.Infof("HrpcServer on %s exiting\n", srvAddr)
+               hsv.exited.Done()
+       }()
        for {
-               conn, err := hsv.listener.Accept()
-               if err != nil {
-                       lg.Errorf("HRPC Accept error: %s\n", err.Error())
-                       continue
-               }
-               if lg.TraceEnabled() {
-                       lg.Tracef("Accepted HRPC connection from %s\n", 
conn.RemoteAddr())
+               select {
+               case cdc:=<-hsv.cdcs:
+                       conn, err := hsv.listener.Accept()
+                       if err != nil {
+                               lg.Errorf("HrpcServer on %s got accept error: 
%s\n", srvAddr, err.Error())
+                               hsv.cdcs<-cdc // never blocks; there is always 
sufficient buffer space
+                               continue
+                       }
+                       if lg.TraceEnabled() {
+                               lg.Tracef("%s: Accepted HRPC connection.\n", 
conn.RemoteAddr())
+                       }
+                       cdc.conn = conn
+                       cdc.numHandled = 0
+                       if hsv.testHooks != nil && 
hsv.testHooks.HandleAdmission != nil {
+                               hsv.testHooks.HandleAdmission()
+                       }
+                       go hsv.ServeCodec(cdc)
+               case <-hsv.shutdown:
+                       return
                }
-               go hsv.ServeCodec(&HrpcServerCodec{
-                       lg:   lg,
-                       conn: conn,
-               })
        }
 }
 
@@ -270,6 +369,12 @@ func (hsv *HrpcServer) Addr() net.Addr {
        return hsv.listener.Addr()
 }
 
+func (hsv *HrpcServer) GetNumIoErrors() uint64 {
+       return atomic.LoadUint64(&hsv.ioErrorCount)
+}
+
 func (hsv *HrpcServer) Close() {
+       close(hsv.shutdown)
        hsv.listener.Close()
+       hsv.exited.Wait()
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
index 97b72ca..5b0dfc6 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go
@@ -110,7 +110,7 @@ func main() {
        }
        var hsv *HrpcServer
        if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" {
-               hsv, err = CreateHrpcServer(cnf, store)
+               hsv, err = CreateHrpcServer(cnf, store, nil)
                if err != nil {
                        lg.Errorf("Error creating HRPC server: %s\n", 
err.Error())
                        os.Exit(1)

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
index 48c20f0..5243d9e 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go
@@ -205,13 +205,12 @@ func testIngestedSpansMetricsImpl(t *testing.T, usePacked 
bool) {
        }
        defer ht.Close()
        var hcl *htrace.Client
-       hcl, err = htrace.NewClient(ht.ClientConf())
+       hcl, err = htrace.NewClient(ht.ClientConf(), &htrace.TestHooks {
+               HrpcDisabled: !usePacked,
+       })
        if err != nil {
                t.Fatalf("failed to create client: %s", err.Error())
        }
-       if !usePacked {
-               hcl.DisableHrpc()
-       }
 
        NUM_TEST_SPANS := 12
        allSpans := createRandomTestSpans(NUM_TEST_SPANS)

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
index a50799a..353beae 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go
@@ -55,6 +55,9 @@ type MiniHTracedBuilder struct {
 
        // If non-null, the WrittenSpans channel to use when creating the 
DataStore.
        WrittenSpans chan *common.Span
+
+       // The test hooks to use for the HRPC server
+       HrpcTestHooks *hrpcTestHooks
 }
 
 type MiniHTraced struct {
@@ -141,7 +144,7 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, 
error) {
                return nil, err
        }
        rstListener = nil
-       hsv, err = CreateHrpcServer(cnf, store)
+       hsv, err = CreateHrpcServer(cnf, store, bld.HrpcTestHooks)
        if err != nil {
                return nil, err
        }
@@ -175,6 +178,7 @@ func (ht *MiniHTraced) RestOnlyClientConf() *conf.Config {
 func (ht *MiniHTraced) Close() {
        ht.Lg.Infof("Closing MiniHTraced %s\n", ht.Name)
        ht.Rsv.Close()
+       ht.Hsv.Close()
        ht.Store.Close()
        if !ht.KeepDataDirsOnClose {
                for idx := range ht.DataDirs {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go 
b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
index 394e1c1..7b5e433 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go
@@ -123,7 +123,7 @@ func main() {
        }
 
        // Create HTrace client
-       hcl, err := htrace.NewClient(cnf)
+       hcl, err := htrace.NewClient(cnf, nil)
        if err != nil {
                fmt.Printf("Failed to create HTrace client: %s\n", err.Error())
                os.Exit(EXIT_FAILURE)

Reply via email to