Repository: incubator-htrace Updated Branches: refs/heads/master fc0d8f38f -> c715e12eb
HTRACE-302. htraced: Add admissions control to HRPC to limit the number of incoming messages (cmccabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/c715e12e Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/c715e12e Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/c715e12e Branch: refs/heads/master Commit: c715e12eb085cf551e90567f80c78886a3cc07f6 Parents: fc0d8f3 Author: Colin P. Mccabe <[email protected]> Authored: Thu Nov 19 16:45:42 2015 -0800 Committer: Colin P. Mccabe <[email protected]> Committed: Thu Nov 19 16:52:56 2015 -0800 ---------------------------------------------------------------------- .../go/src/org/apache/htrace/client/client.go | 28 ++- .../go/src/org/apache/htrace/client/hclient.go | 11 +- .../go/src/org/apache/htrace/common/log.go | 4 + .../src/org/apache/htrace/conf/config_keys.go | 12 ++ .../org/apache/htrace/htraced/client_test.go | 133 ++++++++++++- .../src/org/apache/htrace/htraced/datastore.go | 1 + .../org/apache/htrace/htraced/datastore_test.go | 4 +- .../go/src/org/apache/htrace/htraced/hrpc.go | 193 ++++++++++++++----- .../go/src/org/apache/htrace/htraced/htraced.go | 2 +- .../org/apache/htrace/htraced/metrics_test.go | 7 +- .../org/apache/htrace/htraced/mini_htraced.go | 6 +- .../go/src/org/apache/htrace/htracedTool/cmd.go | 2 +- 12 files changed, 333 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/client/client.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/client/client.go b/htrace-htraced/go/src/org/apache/htrace/client/client.go index 1140209..0028545 100644 --- a/htrace-htraced/go/src/org/apache/htrace/client/client.go +++ b/htrace-htraced/go/src/org/apache/htrace/client/client.go @@ -33,25 +33,35 @@ import ( // A golang client for htraced. // TODO: fancier APIs for streaming spans in the background, optimize TCP stuff - -func NewClient(cnf *conf.Config) (*Client, error) { - hcl := Client{} +func NewClient(cnf *conf.Config, testHooks *TestHooks) (*Client, error) { + hcl := Client{testHooks: testHooks} hcl.restAddr = cnf.Get(conf.HTRACE_WEB_ADDRESS) - hcl.hrpcAddr = cnf.Get(conf.HTRACE_HRPC_ADDRESS) + if testHooks != nil && testHooks.HrpcDisabled { + hcl.hrpcAddr = "" + } else { + hcl.hrpcAddr = cnf.Get(conf.HTRACE_HRPC_ADDRESS) + } return &hcl, nil } +type TestHooks struct { + // If true, HRPC is disabled. + HrpcDisabled bool + + // A function which gets called after we connect to the server and send the + // message frame, but before we write the message body. + HandleWriteRequestBody func() +} + type Client struct { // REST address of the htraced server. restAddr string // HRPC address of the htraced server. hrpcAddr string -} -// Disable HRPC -func (hcl *Client) DisableHrpc() { - hcl.hrpcAddr = "" + // The test hooks to use, or nil if test hooks are not enabled. + testHooks *TestHooks } // Get the htraced server version information. @@ -136,7 +146,7 @@ func (hcl *Client) WriteSpans(req *common.WriteSpansReq) error { if hcl.hrpcAddr == "" { return hcl.writeSpansHttp(req) } - hcr, err := newHClient(hcl.hrpcAddr) + hcr, err := newHClient(hcl.hrpcAddr, hcl.testHooks) if err != nil { return err } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/client/hclient.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/client/hclient.go b/htrace-htraced/go/src/org/apache/htrace/client/hclient.go index 608dd59..ef79deb 100644 --- a/htrace-htraced/go/src/org/apache/htrace/client/hclient.go +++ b/htrace-htraced/go/src/org/apache/htrace/client/hclient.go @@ -38,6 +38,7 @@ type hClient struct { type HrpcClientCodec struct { rwc io.ReadWriteCloser length uint32 + testHooks *TestHooks } func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) error { @@ -72,6 +73,9 @@ func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) erro return errors.New(fmt.Sprintf("Error writing header bytes: %s", err.Error())) } + if cdc.testHooks != nil && cdc.testHooks.HandleWriteRequestBody != nil { + cdc.testHooks.HandleWriteRequestBody() + } _, err = cdc.rwc.Write(buf) if err != nil { return errors.New(fmt.Sprintf("Error writing body bytes: %s", @@ -136,14 +140,17 @@ func (cdc *HrpcClientCodec) Close() error { return cdc.rwc.Close() } -func newHClient(hrpcAddr string) (*hClient, error) { +func newHClient(hrpcAddr string, testHooks *TestHooks) (*hClient, error) { hcr := hClient{} conn, err := net.Dial("tcp", hrpcAddr) if err != nil { return nil, errors.New(fmt.Sprintf("Error contacting the HRPC server "+ "at %s: %s", hrpcAddr, err.Error())) } - hcr.rpcClient = rpc.NewClientWithCodec(&HrpcClientCodec{rwc: conn}) + hcr.rpcClient = rpc.NewClientWithCodec(&HrpcClientCodec{ + rwc: conn, + testHooks: testHooks, + }) return &hcr, nil } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/common/log.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/common/log.go b/htrace-htraced/go/src/org/apache/htrace/common/log.go index 4066094..8a26507 100644 --- a/htrace-htraced/go/src/org/apache/htrace/common/log.go +++ b/htrace-htraced/go/src/org/apache/htrace/common/log.go @@ -291,6 +291,10 @@ func (lg *Logger) ErrorEnabled() bool { return lg.Level <= ERROR } +func (lg *Logger) LevelEnabled(level Level) bool { + return lg.Level <= level +} + func (lg *Logger) Close() { lg.sink.Unref() lg.sink = nil http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go index d10f3af..511833c 100644 --- a/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go +++ b/htrace-htraced/go/src/org/apache/htrace/conf/config_keys.go @@ -86,6 +86,16 @@ const HTRACE_REAPER_HEARTBEAT_PERIOD_MS = "reaper.heartbeat.period.ms" // started. const HTRACE_STARTUP_NOTIFICATION_ADDRESS = "startup.notification.address" +// The maximum number of HRPC handler goroutines we will create at once. If +// this is too small, we won't get enough concurrency; if it's too big, we will +// buffer too much data in memory while waiting for the datastore to process +// requests. +const HTRACE_NUM_HRPC_HANDLERS = "num.hrpc.handlers" + +// The I/O timeout HRPC will use, in milliseconds. If it takes longer than +// this to read or write a message, we will abort the connection. +const HTRACE_HRPC_IO_TIMEOUT_MS = "hrpc.io.timeout.ms" + // Default values for HTrace configuration keys. var DEFAULTS = map[string]string{ HTRACE_WEB_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_WEB_ADDRESS_DEFAULT_PORT), @@ -100,6 +110,8 @@ var DEFAULTS = map[string]string{ HTRACE_METRICS_MAX_ADDR_ENTRIES: "100000", HTRACE_SPAN_EXPIRY_MS: "0", HTRACE_REAPER_HEARTBEAT_PERIOD_MS: fmt.Sprintf("%d", 90*1000), + HTRACE_NUM_HRPC_HANDLERS: "20", + HTRACE_HRPC_IO_TIMEOUT_MS: "60000", } // Values to be used when creating test configurations http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go index 9a51cd4..e4f2151 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/client_test.go @@ -22,12 +22,15 @@ package main import ( "fmt" "math/rand" - htrace "org/apache/htrace/client" "org/apache/htrace/common" + "org/apache/htrace/conf" "org/apache/htrace/test" "sort" "testing" "time" + "sync" + "sync/atomic" + htrace "org/apache/htrace/client" ) func TestClientGetServerVersion(t *testing.T) { @@ -39,7 +42,7 @@ func TestClientGetServerVersion(t *testing.T) { } defer ht.Close() var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf()) + hcl, err = htrace.NewClient(ht.ClientConf(), nil) if err != nil { t.Fatalf("failed to create client: %s", err.Error()) } @@ -58,7 +61,7 @@ func TestClientGetServerDebugInfo(t *testing.T) { } defer ht.Close() var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf()) + hcl, err = htrace.NewClient(ht.ClientConf(), nil) if err != nil { t.Fatalf("failed to create client: %s", err.Error()) } @@ -95,7 +98,7 @@ func TestClientOperations(t *testing.T) { } defer ht.Close() var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf()) + hcl, err = htrace.NewClient(ht.ClientConf(), nil) if err != nil { t.Fatalf("failed to create client: %s", err.Error()) } @@ -185,7 +188,7 @@ func TestDumpAll(t *testing.T) { } defer ht.Close() var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf()) + hcl, err = htrace.NewClient(ht.ClientConf(), nil) if err != nil { t.Fatalf("failed to create client: %s", err.Error()) } @@ -246,7 +249,7 @@ func TestClientGetServerConf(t *testing.T) { } defer ht.Close() var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf()) + hcl, err = htrace.NewClient(ht.ClientConf(), nil) if err != nil { t.Fatalf("failed to create client: %s", err.Error()) } @@ -259,3 +262,121 @@ func TestClientGetServerConf(t *testing.T) { EXAMPLE_CONF_KEY, EXAMPLE_CONF_VALUE) } } + +const TEST_NUM_HRPC_HANDLERS = 2 + +const TEST_NUM_WRITESPANS = 4 + +// Tests that HRPC limits the number of simultaneous connections being processed. +func TestHrpcAdmissionsControl(t *testing.T) { + var wg sync.WaitGroup + wg.Add(TEST_NUM_WRITESPANS) + var numConcurrentHrpcCalls int32 + testHooks := &hrpcTestHooks { + HandleAdmission: func() { + defer wg.Done() + n := atomic.AddInt32(&numConcurrentHrpcCalls, 1) + if n > TEST_NUM_HRPC_HANDLERS { + t.Fatalf("The number of concurrent HRPC calls went above " + + "%d: it's at %d\n", TEST_NUM_HRPC_HANDLERS, n) + } + time.Sleep(1 * time.Millisecond) + n = atomic.AddInt32(&numConcurrentHrpcCalls, -1) + if n >= TEST_NUM_HRPC_HANDLERS { + t.Fatalf("The number of concurrent HRPC calls went above " + + "%d: it was at %d\n", TEST_NUM_HRPC_HANDLERS, n + 1) + } + }, + } + htraceBld := &MiniHTracedBuilder{Name: "TestHrpcAdmissionsControl", + DataDirs: make([]string, 2), + Cnf: map[string]string{ + conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", TEST_NUM_HRPC_HANDLERS), + }, + WrittenSpans: make(chan *common.Span, TEST_NUM_WRITESPANS), + HrpcTestHooks: testHooks, + } + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf(), nil) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + // Create some random trace spans. + allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS) + for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ { + go func(i int) { + err = hcl.WriteSpans(&common.WriteSpansReq{ + Spans: allSpans[i:i+1], + }) + if err != nil { + t.Fatalf("WriteSpans failed: %s\n", err.Error()) + } + }(iter) + } + wg.Wait() + for i := 0; i < TEST_NUM_WRITESPANS; i++ { + <-ht.Store.WrittenSpans + } +} + +// Tests that HRPC I/O timeouts work. +func TestHrpcIoTimeout(t *testing.T) { + htraceBld := &MiniHTracedBuilder{Name: "TestHrpcIoTimeout", + DataDirs: make([]string, 2), + Cnf: map[string]string{ + conf.HTRACE_NUM_HRPC_HANDLERS: fmt.Sprintf("%d", TEST_NUM_HRPC_HANDLERS), + conf.HTRACE_HRPC_IO_TIMEOUT_MS: "1", + }, + } + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() + var hcl *htrace.Client + finishClient := make(chan interface{}) + defer func() { + // Close the finishClient channel, if it hasn't already been closed. + defer func() {recover()}() + close(finishClient) + }() + testHooks := &htrace.TestHooks { + HandleWriteRequestBody: func() { + <-finishClient + }, + } + hcl, err = htrace.NewClient(ht.ClientConf(), testHooks) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + // Create some random trace spans. + allSpans := createRandomTestSpans(TEST_NUM_WRITESPANS) + var wg sync.WaitGroup + wg.Add(TEST_NUM_WRITESPANS) + for iter := 0; iter < TEST_NUM_WRITESPANS; iter++ { + go func(i int) { + defer wg.Done() + // Ignore the error return because there are internal retries in + // the client which will make this succeed eventually, usually. + // Keep in mind that we only block until we have seen + // TEST_NUM_WRITESPANS I/O errors in the HRPC server-- after that, + // we let requests through so that the test can exit cleanly. + hcl.WriteSpans(&common.WriteSpansReq{ + Spans: allSpans[i:i+1], + }) + }(iter) + } + for { + if ht.Hsv.GetNumIoErrors() >= TEST_NUM_WRITESPANS { + break + } + time.Sleep(1000 * time.Nanosecond) + } + close(finishClient) + wg.Wait() +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go index ab2747b..a4bb320 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go @@ -289,6 +289,7 @@ func (shd *shard) writeSpan(ispan *IncomingSpan) error { } shd.mtxMap.IncrementWritten(ispan.Addr, shd.maxMtx, shd.store.lg) if shd.store.WrittenSpans != nil { + shd.store.lg.Errorf("WATERMELON: Sending span to shd.store.WrittenSpans\n") shd.store.WrittenSpans <- span } return nil http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go index 576ee0b..0443834 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore_test.go @@ -410,7 +410,7 @@ func TestReloadDataStore(t *testing.T) { } }() var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf()) + hcl, err = htrace.NewClient(ht.ClientConf(), nil) if err != nil { t.Fatalf("failed to create client: %s", err.Error()) } @@ -444,7 +444,7 @@ func TestReloadDataStore(t *testing.T) { if err != nil { t.Fatalf("failed to re-create datastore: %s", err.Error()) } - hcl, err = htrace.NewClient(ht.ClientConf()) + hcl, err = htrace.NewClient(ht.ClientConf(), nil) if err != nil { t.Fatalf("failed to re-create client: %s", err.Error()) } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go index 0d72602..a0f2e81 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/hrpc.go @@ -33,9 +33,13 @@ import ( "org/apache/htrace/common" "org/apache/htrace/conf" "reflect" + "sync" + "sync/atomic" "time" ) +const MAX_HRPC_HANDLERS = 32765 + // Handles HRPC calls type HrpcHandler struct { lg *common.Logger @@ -46,14 +50,57 @@ type HrpcHandler struct { type HrpcServer struct { *rpc.Server hand *HrpcHandler + + // The listener we are using to accept new connections. listener net.Listener + + // A WaitGroup used to block until the HRPC server has exited. + exited sync.WaitGroup + + // A channel containing server codecs to use. This channel is fully + // buffered. The number of entries it initially contains determines how + // many concurrent codecs we will have running at once. + cdcs chan *HrpcServerCodec + + // Used to shut down + shutdown chan interface{} + + // The I/O timeout to use when reading requests or sending responses. This + // timeout does not apply to the time we spend processing the message. + ioTimeo time.Duration + + // A count of all I/O errors that we have encountered since the server + // started. This counts errors like improperly formatted message frames, + // but not errors like properly formatted but invalid messages. + // This count is updated from multiple goroutines via sync/atomic. + ioErrorCount uint64 + + // The test hooks to use, or nil during normal operation. + testHooks *hrpcTestHooks +} + +type hrpcTestHooks struct { + // A callback we make right after calling Accept() but before reading from + // the new connection. + HandleAdmission func() } -// Codec which encodes HRPC data via JSON +// A codec which encodes HRPC data via JSON. This structure holds the context +// for a particular client connection. type HrpcServerCodec struct { lg *common.Logger + + // The current connection. conn net.Conn + + // The HrpcServer which this connection is part of. + hsv *HrpcServer + + // The message length we read from the header. length uint32 + + // The number of messages this connection has handled. + numHandled int } func asJson(val interface{}) string { @@ -64,45 +111,51 @@ func asJson(val interface{}) string { return string(js) } -func createErrAndWarn(lg *common.Logger, val string) error { - return createErrAndLog(lg, val, common.WARN) +func newIoErrorWarn(cdc *HrpcServerCodec, val string) error { + return newIoError(cdc, val, common.WARN) } -func createErrAndLog(lg *common.Logger, val string, level common.Level) error { - lg.Write(level, val+"\n") +func newIoError(cdc *HrpcServerCodec, val string, level common.Level) error { + if cdc.lg.LevelEnabled(level) { + cdc.lg.Write(level, cdc.conn.RemoteAddr().String() + ": " + val + "\n") + } + if level >= common.INFO { + atomic.AddUint64(&cdc.hsv.ioErrorCount, 1) + } return errors.New(val) } func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error { hdr := common.HrpcRequestHeader{} if cdc.lg.TraceEnabled() { - cdc.lg.Tracef("Reading HRPC request header from %s\n", cdc.conn.RemoteAddr()) + cdc.lg.Tracef("%s: Reading HRPC request header.\n", cdc.conn.RemoteAddr()) } + cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo)) err := binary.Read(cdc.conn, binary.LittleEndian, &hdr) if err != nil { - level := common.WARN - if err == io.EOF { - level = common.DEBUG + if err == io.EOF && cdc.numHandled > 0 { + return newIoError(cdc, fmt.Sprintf("Remote closed connection " + + "after writing %d message(s)", cdc.numHandled), common.DEBUG) } - return createErrAndLog(cdc.lg, fmt.Sprintf("Error reading header bytes: %s", - err.Error()), level) + return newIoError(cdc, + fmt.Sprintf("Error reading request header: %s", err.Error()), common.WARN) } if cdc.lg.TraceEnabled() { - cdc.lg.Tracef("Read HRPC request header %s from %s\n", - asJson(&hdr), cdc.conn.RemoteAddr()) + cdc.lg.Tracef("%s: Read HRPC request header %s\n", + cdc.conn.RemoteAddr(), asJson(&hdr)) } if hdr.Magic != common.HRPC_MAGIC { - return createErrAndWarn(cdc.lg, fmt.Sprintf("Invalid request header: expected "+ + return newIoErrorWarn(cdc, fmt.Sprintf("Invalid request header: expected "+ "magic number of 0x%04x, but got 0x%04x", common.HRPC_MAGIC, hdr.Magic)) } if hdr.Length > common.MAX_HRPC_BODY_LENGTH { - return createErrAndWarn(cdc.lg, fmt.Sprintf("Length prefix was too long. "+ + return newIoErrorWarn(cdc, fmt.Sprintf("Length prefix was too long. "+ "Maximum length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH, hdr.Length)) } req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId) if req.ServiceMethod == "" { - return createErrAndWarn(cdc.lg, fmt.Sprintf("Unknown MethodID code 0x%04x", + return newIoErrorWarn(cdc, fmt.Sprintf("Unknown MethodID code 0x%04x", hdr.MethodId)) } req.Seq = hdr.Seq @@ -111,34 +164,36 @@ func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error { } func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error { - remoteAddr := cdc.conn.RemoteAddr() if cdc.lg.TraceEnabled() { - cdc.lg.Tracef("Reading HRPC %d-byte request body from %s\n", - cdc.length, remoteAddr) + cdc.lg.Tracef("%s: Reading HRPC %d-byte request body.\n", + cdc.conn.RemoteAddr(), cdc.length) } mh := new(codec.MsgpackHandle) mh.WriteExt = true dec := codec.NewDecoder(io.LimitReader(cdc.conn, int64(cdc.length)), mh) err := dec.Decode(body) if err != nil { - return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to read request "+ - "body from %s: %s", remoteAddr, err.Error())) + return newIoErrorWarn(cdc, fmt.Sprintf("Failed to read %d-byte " + + "request body: %s", cdc.length, err.Error())) } if cdc.lg.TraceEnabled() { - cdc.lg.Tracef("Read body from %s: %s\n", - remoteAddr, asJson(&body)) + cdc.lg.Tracef("%s: read %d-byte request body %s\n", + cdc.conn.RemoteAddr(), cdc.length, asJson(&body)) } val := reflect.ValueOf(body) addr := val.Elem().FieldByName("Addr") if addr.IsValid() { - addr.SetString(remoteAddr.String()) + addr.SetString(cdc.conn.RemoteAddr().String()) } + var zeroTime time.Time + cdc.conn.SetDeadline(zeroTime) return nil } var EMPTY []byte = make([]byte, 0) func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) error { + cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo)) var err error buf := EMPTY if msg != nil { @@ -148,7 +203,7 @@ func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) e enc := codec.NewEncoder(w, mh) err := enc.Encode(msg) if err != nil { - return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to marshal "+ + return newIoErrorWarn(cdc, fmt.Sprintf("Failed to marshal "+ "response message: %s", err.Error())) } buf = w.Bytes() @@ -161,13 +216,13 @@ func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) e writer := bufio.NewWriterSize(cdc.conn, 256) err = binary.Write(writer, binary.LittleEndian, &hdr) if err != nil { - return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write response "+ + return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+ "header: %s", err.Error())) } if hdr.ErrLength > 0 { _, err = io.WriteString(writer, resp.Error) if err != nil { - return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write error "+ + return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write error "+ "string: %s", err.Error())) } } @@ -175,24 +230,30 @@ func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) e var length int length, err = writer.Write(buf) if err != nil { - return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write response "+ + return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+ "message: %s", err.Error())) } if uint32(length) != hdr.Length { - return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write all of "+ + return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write all of "+ "response message: %s", err.Error())) } } err = writer.Flush() if err != nil { - return createErrAndWarn(cdc.lg, fmt.Sprintf("Failed to write the response "+ + return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write the response "+ "bytes: %s", err.Error())) } + cdc.numHandled++ return nil } func (cdc *HrpcServerCodec) Close() error { - return cdc.conn.Close() + err := cdc.conn.Close() + cdc.conn = nil + cdc.length = 0 + cdc.numHandled = 0 + cdc.hsv.cdcs <- cdc + return err } func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq, @@ -228,14 +289,36 @@ func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq, return nil } -func CreateHrpcServer(cnf *conf.Config, store *dataStore) (*HrpcServer, error) { +func CreateHrpcServer(cnf *conf.Config, store *dataStore, + testHooks *hrpcTestHooks) (*HrpcServer, error) { lg := common.NewLogger("hrpc", cnf) + numHandlers := cnf.GetInt(conf.HTRACE_NUM_HRPC_HANDLERS) + if numHandlers < 1 { + lg.Warnf("%s must be positive: using 1 handler.\n", conf.HTRACE_NUM_HRPC_HANDLERS) + numHandlers = 1 + } + if numHandlers > MAX_HRPC_HANDLERS { + lg.Warnf("%s cannot be more than %d: using %d handlers\n", + conf.HTRACE_NUM_HRPC_HANDLERS, MAX_HRPC_HANDLERS, MAX_HRPC_HANDLERS) + numHandlers = MAX_HRPC_HANDLERS + } hsv := &HrpcServer{ Server: rpc.NewServer(), hand: &HrpcHandler{ lg: lg, store: store, }, + cdcs: make(chan *HrpcServerCodec, numHandlers), + shutdown: make(chan interface{}), + ioTimeo: time.Millisecond * + time.Duration(cnf.GetInt64(conf.HTRACE_HRPC_IO_TIMEOUT_MS)), + testHooks: testHooks, + } + for i := 0; i < numHandlers; i++ { + hsv.cdcs <- &HrpcServerCodec{ + lg: lg, + hsv: hsv, + } } var err error hsv.listener, err = net.Listen("tcp", cnf.Get(conf.HTRACE_HRPC_ADDRESS)) @@ -243,26 +326,42 @@ func CreateHrpcServer(cnf *conf.Config, store *dataStore) (*HrpcServer, error) { return nil, err } hsv.Server.Register(hsv.hand) + hsv.exited.Add(1) go hsv.run() - lg.Infof("Started HRPC server on %s...\n", hsv.listener.Addr().String()) + lg.Infof("Started HRPC server on %s with %d handler routines. " + + "ioTimeo=%s.\n", hsv.listener.Addr().String(), numHandlers, + hsv.ioTimeo.String()) return hsv, nil } func (hsv *HrpcServer) run() { lg := hsv.hand.lg + srvAddr := hsv.listener.Addr().String() + defer func() { + lg.Infof("HrpcServer on %s exiting\n", srvAddr) + hsv.exited.Done() + }() for { - conn, err := hsv.listener.Accept() - if err != nil { - lg.Errorf("HRPC Accept error: %s\n", err.Error()) - continue - } - if lg.TraceEnabled() { - lg.Tracef("Accepted HRPC connection from %s\n", conn.RemoteAddr()) + select { + case cdc:=<-hsv.cdcs: + conn, err := hsv.listener.Accept() + if err != nil { + lg.Errorf("HrpcServer on %s got accept error: %s\n", srvAddr, err.Error()) + hsv.cdcs<-cdc // never blocks; there is always sufficient buffer space + continue + } + if lg.TraceEnabled() { + lg.Tracef("%s: Accepted HRPC connection.\n", conn.RemoteAddr()) + } + cdc.conn = conn + cdc.numHandled = 0 + if hsv.testHooks != nil && hsv.testHooks.HandleAdmission != nil { + hsv.testHooks.HandleAdmission() + } + go hsv.ServeCodec(cdc) + case <-hsv.shutdown: + return } - go hsv.ServeCodec(&HrpcServerCodec{ - lg: lg, - conn: conn, - }) } } @@ -270,6 +369,12 @@ func (hsv *HrpcServer) Addr() net.Addr { return hsv.listener.Addr() } +func (hsv *HrpcServer) GetNumIoErrors() uint64 { + return atomic.LoadUint64(&hsv.ioErrorCount) +} + func (hsv *HrpcServer) Close() { + close(hsv.shutdown) hsv.listener.Close() + hsv.exited.Wait() } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go index 97b72ca..5b0dfc6 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/htraced.go @@ -110,7 +110,7 @@ func main() { } var hsv *HrpcServer if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" { - hsv, err = CreateHrpcServer(cnf, store) + hsv, err = CreateHrpcServer(cnf, store, nil) if err != nil { lg.Errorf("Error creating HRPC server: %s\n", err.Error()) os.Exit(1) http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go index 48c20f0..5243d9e 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/metrics_test.go @@ -205,13 +205,12 @@ func testIngestedSpansMetricsImpl(t *testing.T, usePacked bool) { } defer ht.Close() var hcl *htrace.Client - hcl, err = htrace.NewClient(ht.ClientConf()) + hcl, err = htrace.NewClient(ht.ClientConf(), &htrace.TestHooks { + HrpcDisabled: !usePacked, + }) if err != nil { t.Fatalf("failed to create client: %s", err.Error()) } - if !usePacked { - hcl.DisableHrpc() - } NUM_TEST_SPANS := 12 allSpans := createRandomTestSpans(NUM_TEST_SPANS) http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go index a50799a..353beae 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/mini_htraced.go @@ -55,6 +55,9 @@ type MiniHTracedBuilder struct { // If non-null, the WrittenSpans channel to use when creating the DataStore. WrittenSpans chan *common.Span + + // The test hooks to use for the HRPC server + HrpcTestHooks *hrpcTestHooks } type MiniHTraced struct { @@ -141,7 +144,7 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) { return nil, err } rstListener = nil - hsv, err = CreateHrpcServer(cnf, store) + hsv, err = CreateHrpcServer(cnf, store, bld.HrpcTestHooks) if err != nil { return nil, err } @@ -175,6 +178,7 @@ func (ht *MiniHTraced) RestOnlyClientConf() *conf.Config { func (ht *MiniHTraced) Close() { ht.Lg.Infof("Closing MiniHTraced %s\n", ht.Name) ht.Rsv.Close() + ht.Hsv.Close() ht.Store.Close() if !ht.KeepDataDirsOnClose { for idx := range ht.DataDirs { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/c715e12e/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go index 394e1c1..7b5e433 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go +++ b/htrace-htraced/go/src/org/apache/htrace/htracedTool/cmd.go @@ -123,7 +123,7 @@ func main() { } // Create HTrace client - hcl, err := htrace.NewClient(cnf) + hcl, err := htrace.NewClient(cnf, nil) if err != nil { fmt.Printf("Failed to create HTrace client: %s\n", err.Error()) os.Exit(EXIT_FAILURE)
