Repository: incubator-htrace
Updated Branches:
  refs/heads/master 26fdb7cd8 -> 5210d97d2


HTRACE-150. htraced: add HRPC endpoint for writeSpans (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/5210d97d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/5210d97d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/5210d97d

Branch: refs/heads/master
Commit: 5210d97d2ff5c275570a916a1f6758578b23a772
Parents: 26fdb7c
Author: Colin P. Mccabe <[email protected]>
Authored: Wed Apr 22 15:16:55 2015 -0700
Committer: Colin P. Mccabe <[email protected]>
Committed: Wed Apr 22 15:16:55 2015 -0700

----------------------------------------------------------------------
 .../go/src/org/apache/htrace/client/client.go   |  75 ++++++-
 .../go/src/org/apache/htrace/client/hclient.go  | 151 ++++++++++++++
 .../src/go/src/org/apache/htrace/common/rpc.go  |  81 ++++++++
 .../src/org/apache/htrace/conf/config_keys.go   |   9 +-
 .../src/go/src/org/apache/htrace/htrace/cmd.go  |  44 +++--
 .../org/apache/htrace/htraced/client_test.go    |  20 +-
 .../org/apache/htrace/htraced/datastore_test.go |   9 +-
 .../go/src/org/apache/htrace/htraced/hrpc.go    | 197 +++++++++++++++++++
 .../go/src/org/apache/htrace/htraced/htraced.go |  15 ++
 .../org/apache/htrace/htraced/mini_htraced.go   |  14 +-
 10 files changed, 569 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/client/client.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/client/client.go 
b/htrace-core/src/go/src/org/apache/htrace/client/client.go
index 5e594d9..44e2f69 100644
--- a/htrace-core/src/go/src/org/apache/htrace/client/client.go
+++ b/htrace-core/src/go/src/org/apache/htrace/client/client.go
@@ -35,13 +35,24 @@ import (
 // TODO: fancier APIs for streaming spans in the background, optimize TCP stuff
 
 func NewClient(cnf *conf.Config) (*Client, error) {
-       hcl := Client{restAddr: cnf.Get(conf.HTRACE_WEB_ADDRESS)}
+       hcl := Client{}
+       hcl.restAddr = cnf.Get(conf.HTRACE_WEB_ADDRESS)
+       if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" {
+               var err error
+               hcl.hcr, err = newHClient(cnf)
+               if err != nil {
+                       return nil, err
+               }
+       }
        return &hcl, nil
 }
 
 type Client struct {
        // REST address of the htraced server.
        restAddr string
+
+       // The HRPC client, or null if it is not enabled.
+       hcr *hClient
 }
 
 // Get the htraced server information.
@@ -77,12 +88,42 @@ func (hcl *Client) FindSpan(sid common.SpanId) 
(*common.Span, error) {
        return &span, nil
 }
 
-func (hcl *Client) WriteSpan(span *common.Span) error {
-       buf, err := json.Marshal(span)
-       if err != nil {
-               return err
+func (hcl *Client) WriteSpans(req *common.WriteSpansReq) error {
+       if hcl.hcr != nil {
+               return hcl.hcr.writeSpans(req)
+       } else {
+               return hcl.writeSpansHttp(req)
        }
-       _, _, err = hcl.makeRestRequest("POST", "writeSpans", 
bytes.NewReader(buf))
+}
+
+func (hcl *Client) writeSpansHttp(req *common.WriteSpansReq) error {
+       var w bytes.Buffer
+       var err error
+       for i := range req.Spans {
+               var buf []byte
+               buf, err = json.Marshal(req.Spans[i])
+               if err != nil {
+                       return errors.New(fmt.Sprintf("Error serializing span: 
%s",
+                               err.Error()))
+               }
+               _, err = w.Write(buf)
+               if err != nil {
+                       return errors.New(fmt.Sprintf("Error writing span: %s",
+                               err.Error()))
+               }
+               _, err = w.Write([]byte{'\n'})
+               //err = io.WriteString(&w, "\n")
+               if err != nil {
+                       return errors.New(fmt.Sprintf("Error writing: %s",
+                               err.Error()))
+               }
+       }
+       customHeaders := make(map[string]string)
+       if req.DefaultPid != "" {
+               customHeaders["htrace-pid"] = req.DefaultPid
+       }
+       _, _, err = hcl.makeRestRequest("POST", "writeSpans",
+               &w, customHeaders)
        if err != nil {
                return err
        }
@@ -90,7 +131,6 @@ func (hcl *Client) WriteSpan(span *common.Span) error {
 }
 
 // Find the child IDs of a given span ID.
-// TODO: add offset as well as limit?
 func (hcl *Client) FindChildren(sid common.SpanId, lim int) ([]common.SpanId, 
error) {
        buf, _, err := 
hcl.makeGetRequest(fmt.Sprintf("span/%016x/children?lim=%d",
                uint64(sid), lim))
@@ -126,17 +166,24 @@ func (hcl *Client) Query(query *common.Query) 
([]common.Span, error) {
        return spans, nil
 }
 
+var EMPTY = make(map[string]string)
+
 func (hcl *Client) makeGetRequest(reqName string) ([]byte, int, error) {
-       return hcl.makeRestRequest("GET", reqName, nil)
+       return hcl.makeRestRequest("GET", reqName, nil, EMPTY)
 }
 
 // Make a general JSON REST request.
 // Returns the request body, the response code, and the error.
 // Note: if the response code is non-zero, the error will also be non-zero.
-func (hcl *Client) makeRestRequest(reqType string, reqName string, reqBody 
io.Reader) ([]byte, int, error) {
-       url := fmt.Sprintf("http://%s/%s";, hcl.restAddr, reqName)
+func (hcl *Client) makeRestRequest(reqType string, reqName string, reqBody 
io.Reader,
+       customHeaders map[string]string) ([]byte, int, error) {
+       url := fmt.Sprintf("http://%s/%s";,
+               hcl.restAddr, reqName)
        req, err := http.NewRequest(reqType, url, reqBody)
        req.Header.Set("Content-Type", "application/json")
+       for k, v := range customHeaders {
+               req.Header.Set(k, v)
+       }
        client := &http.Client{}
        resp, err := client.Do(req)
        if err != nil {
@@ -187,3 +234,11 @@ func (hcl *Client) DumpAll(lim int, out chan *common.Span) 
error {
                searchId = spans[len(spans)-1].Id + 1
        }
 }
+
+func (hcl *Client) Close() {
+       if hcl.hcr != nil {
+               hcl.hcr.Close()
+       }
+       hcl.restAddr = ""
+       hcl.hcr = nil
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/client/hclient.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/client/hclient.go 
b/htrace-core/src/go/src/org/apache/htrace/client/hclient.go
new file mode 100644
index 0000000..1730c02
--- /dev/null
+++ b/htrace-core/src/go/src/org/apache/htrace/client/hclient.go
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+       "encoding/binary"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "io"
+       "net"
+       "net/rpc"
+       "org/apache/htrace/common"
+       "org/apache/htrace/conf"
+)
+
+type hClient struct {
+       rpcClient *rpc.Client
+}
+
+type HrpcClientCodec struct {
+       rwc    io.ReadWriteCloser
+       length uint32
+}
+
+func (cdc *HrpcClientCodec) WriteRequest(req *rpc.Request, msg interface{}) 
error {
+       methodId := common.HrpcMethodNameToId(req.ServiceMethod)
+       if methodId == common.METHOD_ID_NONE {
+               return errors.New(fmt.Sprintf("HrpcClientCodec: Unknown method 
name %s",
+                       req.ServiceMethod))
+       }
+       buf, err := json.Marshal(msg)
+       if err != nil {
+               return errors.New(fmt.Sprintf("HrpcClientCodec: Unable to 
marshal "+
+                       "message as JSON: %s", err.Error()))
+       }
+       if len(buf) > common.MAX_HRPC_BODY_LENGTH {
+               return errors.New(fmt.Sprintf("HrpcClientCodec: message body is 
%d "+
+                       "bytes, but the maximum message size is %d bytes.",
+                       len(buf), common.MAX_HRPC_BODY_LENGTH))
+       }
+       hdr := common.HrpcRequestHeader{
+               Magic:    common.HRPC_MAGIC,
+               MethodId: methodId,
+               Seq:      req.Seq,
+               Length:   uint32(len(buf)),
+       }
+       err = binary.Write(cdc.rwc, binary.BigEndian, &hdr)
+       if err != nil {
+               return errors.New(fmt.Sprintf("Error writing header bytes: %s",
+                       err.Error()))
+       }
+       _, err = cdc.rwc.Write(buf)
+       if err != nil {
+               return errors.New(fmt.Sprintf("Error writing body bytes: %s",
+                       err.Error()))
+       }
+       return nil
+}
+
+func (cdc *HrpcClientCodec) ReadResponseHeader(resp *rpc.Response) error {
+       hdr := common.HrpcResponseHeader{}
+       err := binary.Read(cdc.rwc, binary.BigEndian, &hdr)
+       if err != nil {
+               return errors.New(fmt.Sprintf("Error reading response header "+
+                       "bytes: %s", err.Error()))
+       }
+       resp.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId)
+       if resp.ServiceMethod == "" {
+               return errors.New(fmt.Sprintf("Error reading response header: "+
+                       "invalid method ID %d.", hdr.MethodId))
+       }
+       resp.Seq = hdr.Seq
+       if hdr.ErrLength > 0 {
+               if hdr.ErrLength > common.MAX_HRPC_ERROR_LENGTH {
+                       return errors.New(fmt.Sprintf("Error reading response 
header: "+
+                               "error message was %d bytes long, but "+
+                               "MAX_HRPC_ERROR_LENGTH is %d.",
+                               hdr.ErrLength, common.MAX_HRPC_ERROR_LENGTH))
+               }
+               buf := make([]byte, hdr.ErrLength)
+               var nread int
+               nread, err = cdc.rwc.Read(buf)
+               if uint32(nread) != hdr.ErrLength {
+                       return errors.New(fmt.Sprintf("Error reading response 
header: "+
+                               "failed to read %d bytes of error message.", 
nread))
+               }
+               if err != nil {
+                       return errors.New(fmt.Sprintf("Error reading response 
header: "+
+                               "failed to read %d bytes of error message: %s",
+                               nread, err.Error()))
+               }
+               resp.Error = string(buf)
+       } else {
+               resp.Error = ""
+       }
+       cdc.length = hdr.Length
+       return nil
+}
+
+func (cdc *HrpcClientCodec) ReadResponseBody(body interface{}) error {
+       dec := json.NewDecoder(io.LimitReader(cdc.rwc, int64(cdc.length)))
+       err := dec.Decode(body)
+       if err != nil {
+               return errors.New(fmt.Sprintf("Failed to read response body: 
%s",
+                       err.Error()))
+       }
+       return nil
+}
+
+func (cdc *HrpcClientCodec) Close() error {
+       return cdc.rwc.Close()
+}
+
+func newHClient(cnf *conf.Config) (*hClient, error) {
+       hcr := hClient{}
+       addr := cnf.Get(conf.HTRACE_HRPC_ADDRESS)
+       conn, err := net.Dial("tcp", addr)
+       if err != nil {
+               return nil, errors.New(fmt.Sprintf("Error contacting the HRPC 
server "+
+                       "at %s: %s", addr, err.Error()))
+       }
+       hcr.rpcClient = rpc.NewClientWithCodec(&HrpcClientCodec{rwc: conn})
+       return &hcr, nil
+}
+
+func (hcr *hClient) writeSpans(req *common.WriteSpansReq) error {
+       resp := common.WriteSpansResp{}
+       return hcr.rpcClient.Call(common.METHOD_NAME_WRITE_SPANS, req, &resp)
+}
+
+func (hcr *hClient) Close() {
+       hcr.rpcClient.Close()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/common/rpc.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/common/rpc.go 
b/htrace-core/src/go/src/org/apache/htrace/common/rpc.go
new file mode 100644
index 0000000..cdf7e08
--- /dev/null
+++ b/htrace-core/src/go/src/org/apache/htrace/common/rpc.go
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package common
+
+// The 4-byte magic number which is sent first in the HRPC header
+const HRPC_MAGIC = 0x48545243
+
+// Method ID codes.  Do not reorder these.
+const (
+       METHOD_ID_NONE        = 0
+       METHOD_ID_WRITE_SPANS = iota
+)
+
+const METHOD_NAME_WRITE_SPANS = "HrpcHandler.WriteSpans"
+
+// Maximum length of the error message passed in an HRPC response
+const MAX_HRPC_ERROR_LENGTH = 4 * 1024 * 1024
+
+// Maximum length of HRPC message body
+const MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024
+
+// A request to write spans to htraced.
+type WriteSpansReq struct {
+       DefaultPid string
+       Spans      []*Span
+}
+
+// A response to a WriteSpansReq
+type WriteSpansResp struct {
+}
+
+// The header which is sent over the wire for HRPC
+type HrpcRequestHeader struct {
+       Magic    uint32
+       MethodId uint32
+       Seq      uint64
+       Length   uint32
+}
+
+// The response which is sent over the wire for HRPC
+type HrpcResponseHeader struct {
+       Seq       uint64
+       MethodId  uint32
+       ErrLength uint32
+       Length    uint32
+}
+
+func HrpcMethodIdToMethodName(id uint32) string {
+       switch id {
+       case METHOD_ID_WRITE_SPANS:
+               return METHOD_NAME_WRITE_SPANS
+       default:
+               return ""
+       }
+}
+
+func HrpcMethodNameToId(name string) uint32 {
+       switch name {
+       case METHOD_NAME_WRITE_SPANS:
+               return METHOD_ID_WRITE_SPANS
+       default:
+               return METHOD_ID_NONE
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go 
b/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go
index ba63f2d..ccb09e0 100644
--- a/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go
+++ b/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go
@@ -47,6 +47,12 @@ const HTRACE_WEB_ADDRESS = "web.address"
 // The default port for the Htrace web address.
 const HTRACE_WEB_ADDRESS_DEFAULT_PORT = 9095
 
+// The web address to start the REST server on.
+const HTRACE_HRPC_ADDRESS = "hrpc.address"
+
+// The default port for the Htrace HRPC address.
+const HTRACE_HRPC_ADDRESS_DEFAULT_PORT = 9075
+
 // The directories to put the data store into.  Separated by PATH_LIST_SEP.
 const HTRACE_DATA_STORE_DIRECTORIES = "data.store.directories"
 
@@ -69,7 +75,8 @@ const HTRACE_STARTUP_NOTIFICATION_ADDRESS = 
"startup.notification.address"
 
 // Default values for HTrace configuration keys.
 var DEFAULTS = map[string]string{
-       HTRACE_WEB_ADDRESS: fmt.Sprintf("0.0.0.0:%d", 
HTRACE_WEB_ADDRESS_DEFAULT_PORT),
+       HTRACE_WEB_ADDRESS:  fmt.Sprintf("0.0.0.0:%d", 
HTRACE_WEB_ADDRESS_DEFAULT_PORT),
+       HTRACE_HRPC_ADDRESS: fmt.Sprintf("0.0.0.0:%d", 
HTRACE_HRPC_ADDRESS_DEFAULT_PORT),
        HTRACE_DATA_STORE_DIRECTORIES: PATH_SEP + "tmp" + PATH_SEP + "htrace1" +
                PATH_LIST_SEP + PATH_SEP + "tmp" + PATH_SEP + "htrace2",
        HTRACE_DATA_STORE_CLEAR:            "false",

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go 
b/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go
index 0317237..38cdb58 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go
@@ -21,6 +21,7 @@ package main
 
 import (
        "bufio"
+       "bytes"
        "encoding/json"
        "errors"
        "fmt"
@@ -203,8 +204,17 @@ func doLoadSpanJsonFile(hcl *htrace.Client, spanFile 
string) int {
                return EXIT_FAILURE
        }
        defer file.Close()
-       in := bufio.NewReader(file)
-       dec := json.NewDecoder(in)
+       return doLoadSpans(hcl, bufio.NewReader(file))
+}
+
+func doLoadSpanJson(hcl *htrace.Client, spanJson string) int {
+       return doLoadSpans(hcl, bytes.NewBufferString(spanJson))
+}
+
+func doLoadSpans(hcl *htrace.Client, reader io.Reader) int {
+       dec := json.NewDecoder(reader)
+       spans := make([]*common.Span, 0, 32)
+       var err error
        for {
                var span common.Span
                if err = dec.Decode(&span); err != nil {
@@ -214,26 +224,20 @@ func doLoadSpanJsonFile(hcl *htrace.Client, spanFile 
string) int {
                        fmt.Printf("Failed to decode JSON: %s\n", err.Error())
                        return EXIT_FAILURE
                }
-               if *verbose {
-                       fmt.Printf("wrote %s\n", span.ToJson())
-               }
-               if err = hcl.WriteSpan(&span); err != nil {
-                       fmt.Println(err.Error())
-                       return EXIT_FAILURE
-               }
+               spans = append(spans, &span)
        }
-       return EXIT_SUCCESS
-}
-
-func doLoadSpanJson(hcl *htrace.Client, spanJson string) int {
-       spanBytes := []byte(spanJson)
-       var span common.Span
-       err := json.Unmarshal(spanBytes, &span)
-       if err != nil {
-               fmt.Printf("Error parsing provided JSON: %s\n", err.Error())
-               return EXIT_FAILURE
+       if *verbose {
+               fmt.Printf("Writing ")
+               prefix := ""
+               for i := range spans {
+                       fmt.Printf("%s%s", prefix, spans[i].ToJson())
+                       prefix = ", "
+               }
+               fmt.Printf("\n")
        }
-       err = hcl.WriteSpan(&span)
+       err = hcl.WriteSpans(&common.WriteSpansReq{
+               Spans: spans,
+       })
        if err != nil {
                fmt.Println(err.Error())
                return EXIT_FAILURE

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go 
b/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go
index f9e66ce..218c1c8 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/client_test.go
@@ -79,10 +79,12 @@ func TestClientOperations(t *testing.T) {
        allSpans := createRandomTestSpans(NUM_TEST_SPANS)
 
        // Write half of the spans to htraced via the client.
-       for i := 0; i < NUM_TEST_SPANS/2; i++ {
-               if err := hcl.WriteSpan(allSpans[i]); err != nil {
-                       t.Fatalf("WriteSpan(%d) failed: %s\n", i, err.Error())
-               }
+       err = hcl.WriteSpans(&common.WriteSpansReq{
+               Spans: allSpans[0 : NUM_TEST_SPANS/2],
+       })
+       if err != nil {
+               t.Fatalf("WriteSpans(0:%d) failed: %s\n", NUM_TEST_SPANS/2,
+                       err.Error())
        }
 
        // Look up the first half of the spans.  They should be found.
@@ -165,11 +167,11 @@ func TestDumpAll(t *testing.T) {
        NUM_TEST_SPANS := 100
        allSpans := createRandomTestSpans(NUM_TEST_SPANS)
        sort.Sort(allSpans)
-       for i := range allSpans {
-               err = hcl.WriteSpan(allSpans[i])
-               if err != nil {
-                       t.Fatalf("failed to write span %d: %s", i, err.Error())
-               }
+       err = hcl.WriteSpans(&common.WriteSpansReq{
+               Spans: allSpans,
+       })
+       if err != nil {
+               t.Fatalf("WriteSpans failed: %s\n", err.Error())
        }
        out := make(chan *common.Span, 50)
        var dumpErr error

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go 
b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
index 77497c5..79a7c4f 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go
@@ -374,10 +374,11 @@ func TestReloadDataStore(t *testing.T) {
        // Create some random trace spans.
        NUM_TEST_SPANS := 5
        allSpans := createRandomTestSpans(NUM_TEST_SPANS)
-       for i := 0; i < NUM_TEST_SPANS; i++ {
-               if err := hcl.WriteSpan(allSpans[i]); err != nil {
-                       t.Fatalf("WriteSpan(%d) failed: %s\n", i, err.Error())
-               }
+       err = hcl.WriteSpans(&common.WriteSpansReq{
+               Spans: allSpans,
+       })
+       if err != nil {
+               t.Fatalf("WriteSpans failed: %s\n", err.Error())
        }
 
        // Look up the spans we wrote.

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/htraced/hrpc.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/hrpc.go 
b/htrace-core/src/go/src/org/apache/htrace/htraced/hrpc.go
new file mode 100644
index 0000000..9696cbc
--- /dev/null
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/hrpc.go
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package main
+
+import (
+       "bufio"
+       "encoding/binary"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "io"
+       "net"
+       "net/rpc"
+       "org/apache/htrace/common"
+       "org/apache/htrace/conf"
+)
+
+// Handles HRPC calls
+type HrpcHandler struct {
+       lg    *common.Logger
+       store *dataStore
+}
+
+// The HRPC server
+type HrpcServer struct {
+       *rpc.Server
+       hand     *HrpcHandler
+       listener net.Listener
+}
+
+// Codec which encodes HRPC data via JSON
+type HrpcServerCodec struct {
+       rwc    io.ReadWriteCloser
+       length uint32
+}
+
+func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error {
+       hdr := common.HrpcRequestHeader{}
+       err := binary.Read(cdc.rwc, binary.BigEndian, &hdr)
+       if err != nil {
+               return errors.New(fmt.Sprintf("Error reading header bytes: %s", 
err.Error()))
+       }
+       if hdr.Magic != common.HRPC_MAGIC {
+               return errors.New(fmt.Sprintf("Invalid request header: expected 
"+
+                       "magic number of 0x%04x, but got 0x%04x", 
common.HRPC_MAGIC, hdr.Magic))
+       }
+       if hdr.Length > common.MAX_HRPC_BODY_LENGTH {
+               return errors.New(fmt.Sprintf("Length prefix was too long.  
Maximum "+
+                       "length is %d, but we got %d.", 
common.MAX_HRPC_BODY_LENGTH, hdr.Length))
+       }
+       req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId)
+       if req.ServiceMethod == "" {
+               return errors.New(fmt.Sprintf("Unknown MethodID code 0x%04x",
+                       hdr.MethodId))
+       }
+       req.Seq = hdr.Seq
+       cdc.length = hdr.Length
+       return nil
+}
+
+func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error {
+       dec := json.NewDecoder(io.LimitReader(cdc.rwc, int64(cdc.length)))
+       err := dec.Decode(body)
+       if err != nil {
+               return errors.New(fmt.Sprintf("Failed to read request body: %s",
+                       err.Error()))
+       }
+       return nil
+}
+
+var EMPTY []byte = make([]byte, 0)
+
+func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) 
error {
+       var err error
+       buf := EMPTY
+       if msg != nil {
+               buf, err = json.Marshal(msg)
+               if err != nil {
+                       return errors.New(fmt.Sprintf("Failed to marshal 
response message: %s",
+                               err.Error()))
+               }
+       }
+       hdr := common.HrpcResponseHeader{}
+       hdr.MethodId = common.HrpcMethodNameToId(resp.ServiceMethod)
+       hdr.Seq = resp.Seq
+       hdr.ErrLength = uint32(len(resp.Error))
+       hdr.Length = uint32(len(buf))
+       writer := bufio.NewWriterSize(cdc.rwc, 256)
+       err = binary.Write(writer, binary.BigEndian, &hdr)
+       if err != nil {
+               return errors.New(fmt.Sprintf("Failed to write response header: 
%s",
+                       err.Error()))
+       }
+       if hdr.ErrLength > 0 {
+               _, err = io.WriteString(writer, resp.Error)
+               if err != nil {
+                       return errors.New(fmt.Sprintf("Failed to write error 
string: %s",
+                               err.Error()))
+               }
+       }
+       if hdr.Length > 0 {
+               var length int
+               length, err = writer.Write(buf)
+               if err != nil {
+                       return errors.New(fmt.Sprintf("Failed to write response 
"+
+                               "message: %s", err.Error()))
+               }
+               if uint32(length) != hdr.Length {
+                       return errors.New(fmt.Sprintf("Failed to write all of 
response "+
+                               "message: %s", err.Error()))
+               }
+       }
+       err = writer.Flush()
+       if err != nil {
+               return errors.New(fmt.Sprintf("Failed to write the response 
bytes: "+
+                       "%s", err.Error()))
+       }
+       return nil
+}
+
+func (cdc *HrpcServerCodec) Close() error {
+       return cdc.rwc.Close()
+}
+
+func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq,
+       resp *common.WriteSpansResp) (err error) {
+       hand.lg.Debugf("hrpc writeSpansHandler: received %d span(s).  "+
+               "defaultPid = %s\n", len(req.Spans), req.DefaultPid)
+       for i := range req.Spans {
+               span := req.Spans[i]
+               if span.ProcessId == "" {
+                       span.ProcessId = req.DefaultPid
+               }
+               hand.lg.Tracef("writing span %d: %s\n", i, span.ToJson())
+               hand.store.WriteSpan(span)
+       }
+       return nil
+}
+
+func CreateHrpcServer(cnf *conf.Config, store *dataStore) (*HrpcServer, error) 
{
+       lg := common.NewLogger("hrpc", cnf)
+       hsv := &HrpcServer{
+               Server: rpc.NewServer(),
+               hand: &HrpcHandler{
+                       lg:    lg,
+                       store: store,
+               },
+       }
+       var err error
+       hsv.listener, err = net.Listen("tcp", cnf.Get(conf.HTRACE_HRPC_ADDRESS))
+       if err != nil {
+               return nil, err
+       }
+       hsv.Server.Register(hsv.hand)
+       go hsv.run()
+       lg.Infof("Started HRPC server on %s...\n", hsv.listener.Addr().String())
+       return hsv, nil
+}
+
+func (hsv *HrpcServer) run() {
+       lg := hsv.hand.lg
+       for {
+               conn, err := hsv.listener.Accept()
+               if err != nil {
+                       lg.Errorf("HRPC Accept error: %s\n", err.Error())
+                       continue
+               }
+               go hsv.ServeCodec(&HrpcServerCodec{
+                       rwc: conn,
+               })
+       }
+}
+
+func (hsv *HrpcServer) Addr() net.Addr {
+       return hsv.listener.Addr()
+}
+
+func (hsv *HrpcServer) Close() {
+       hsv.listener.Close()
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go 
b/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go
index c3432b4..64da457 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go
@@ -78,12 +78,26 @@ func main() {
                lg.Errorf("Error creating REST server: %s\n", err.Error())
                os.Exit(1)
        }
+       var hsv *HrpcServer
+       if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" {
+               hsv, err = CreateHrpcServer(cnf, store)
+               if err != nil {
+                       lg.Errorf("Error creating HRPC server: %s\n", 
err.Error())
+                       os.Exit(1)
+               }
+       } else {
+               lg.Infof("Not starting HRPC server because no value was given 
for %s.\n",
+                       conf.HTRACE_HRPC_ADDRESS)
+       }
        naddr := cnf.Get(conf.HTRACE_STARTUP_NOTIFICATION_ADDRESS)
        if naddr != "" {
                notif := StartupNotification{
                        HttpAddr:  rsv.Addr().String(),
                        ProcessId: os.Getpid(),
                }
+               if hsv != nil {
+                       notif.HrpcAddr = hsv.Addr().String()
+               }
                err = sendStartupNotification(naddr, &notif)
                if err != nil {
                        fmt.Fprintf(os.Stderr, "Failed to send startup 
notification: "+
@@ -100,6 +114,7 @@ func main() {
 // Used by unit tests.
 type StartupNotification struct {
        HttpAddr  string
+       HrpcAddr  string
        ProcessId int
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5210d97d/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go
----------------------------------------------------------------------
diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go 
b/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go
index be7e284..a54f2cb 100644
--- a/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go
+++ b/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go
@@ -62,6 +62,7 @@ type MiniHTraced struct {
        DataDirs            []string
        Store               *dataStore
        Rsv                 *RestServer
+       Hsv                 *HrpcServer
        Lg                  *common.Logger
        KeepDataDirsOnClose bool
 }
@@ -70,6 +71,7 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) {
        var err error
        var store *dataStore
        var rsv *RestServer
+       var hsv *HrpcServer
        if bld.Name == "" {
                bld.Name = "HTraceTest"
        }
@@ -90,7 +92,8 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) {
        }
        bld.Cnf[conf.HTRACE_DATA_STORE_DIRECTORIES] =
                strings.Join(bld.DataDirs, conf.PATH_LIST_SEP)
-       bld.Cnf[conf.HTRACE_WEB_ADDRESS] = ":0" // use a random port for the 
REST server
+       bld.Cnf[conf.HTRACE_WEB_ADDRESS] = ":0"  // use a random port for the 
REST server
+       bld.Cnf[conf.HTRACE_HRPC_ADDRESS] = ":0" // use a random port for the 
HRPC server
        bld.Cnf[conf.HTRACE_LOG_LEVEL] = "TRACE"
        cnfBld := conf.Builder{Values: bld.Cnf, Defaults: conf.DEFAULTS}
        cnf, err := cnfBld.Build()
@@ -123,6 +126,11 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, 
error) {
        if err != nil {
                return nil, err
        }
+       hsv, err = CreateHrpcServer(cnf, store)
+       if err != nil {
+               return nil, err
+       }
+
        lg.Infof("Created MiniHTraced %s\n", bld.Name)
        return &MiniHTraced{
                Name:                bld.Name,
@@ -130,6 +138,7 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, 
error) {
                DataDirs:            bld.DataDirs,
                Store:               store,
                Rsv:                 rsv,
+               Hsv:                 hsv,
                Lg:                  lg,
                KeepDataDirsOnClose: bld.KeepDataDirsOnClose,
        }, nil
@@ -137,7 +146,8 @@ func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, 
error) {
 
 // Return a Config object that clients can use to connect to this MiniHTraceD.
 func (ht *MiniHTraced) ClientConf() *conf.Config {
-       return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String())
+       return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String(),
+               conf.HTRACE_HRPC_ADDRESS, ht.Hsv.Addr().String())
 }
 
 func (ht *MiniHTraced) Close() {

Reply via email to