HTRACE-237. Optimize htraced span receiver (cmccabe)

Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/cb2351d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/cb2351d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/cb2351d2

Branch: refs/heads/master
Commit: cb2351d292663582c84885ce899f2777b81ecd30
Parents: 43ce213
Author: Colin P. Mccabe <[email protected]>
Authored: Mon Oct 12 13:09:41 2015 -0700
Committer: Colin P. Mccabe <[email protected]>
Committed: Mon Oct 12 13:13:43 2015 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/htrace/core/Span.java  |   8 +-
 .../java/org/apache/htrace/util/TestUtil.java   |  75 +++
 .../go/src/org/apache/htrace/client/client.go   |  42 +-
 .../go/src/org/apache/htrace/common/rest.go     |  45 --
 .../go/src/org/apache/htrace/common/rpc.go      |  27 +-
 .../src/org/apache/htrace/common/span_test.go   |   2 +-
 .../go/src/org/apache/htrace/conf/config.go     |   4 +-
 .../go/src/org/apache/htrace/htrace/cmd.go      |   6 +-
 .../src/org/apache/htrace/htraced/datastore.go  |  14 +-
 .../go/src/org/apache/htrace/htraced/rest.go    |  41 +-
 htrace-htraced/pom.xml                          |  15 +-
 .../org/apache/htrace/impl/BufferManager.java   |  74 +++
 .../main/java/org/apache/htrace/impl/Conf.java  | 356 ++++++++++++
 .../apache/htrace/impl/HTracedRESTReceiver.java | 431 --------------
 .../apache/htrace/impl/HTracedSpanReceiver.java | 333 +++++++++++
 .../org/apache/htrace/impl/PackedBuffer.java    | 449 +++++++++++++++
 .../apache/htrace/impl/PackedBufferManager.java | 340 +++++++++++
 .../apache/htrace/impl/RateLimitedLogger.java   |  72 +++
 .../apache/htrace/impl/RestBufferManager.java   | 225 ++++++++
 .../java/org/apache/htrace/impl/TimeUtil.java   |  78 +++
 .../java/org/apache/htrace/impl/DataDir.java    |  58 ++
 .../org/apache/htrace/impl/HTracedProcess.java  | 277 +++++++++
 .../htrace/impl/TestHTracedRESTReceiver.java    | 215 -------
 .../apache/htrace/impl/TestHTracedReceiver.java | 572 +++++++++++++++++++
 .../htrace/impl/TestHTracedReceiverConf.java    |  91 +++
 .../apache/htrace/impl/TestPackedBuffer.java    |  76 +++
 .../org/apache/htrace/impl/TestTimeUtil.java    |  46 ++
 .../java/org/apache/htrace/util/DataDir.java    |  97 ----
 .../org/apache/htrace/util/HTracedProcess.java  | 172 ------
 .../apache/htrace/util/TestHTracedProcess.java  | 100 ----
 30 files changed, 3199 insertions(+), 1142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-core4/src/main/java/org/apache/htrace/core/Span.java
----------------------------------------------------------------------
diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/Span.java 
b/htrace-core4/src/main/java/org/apache/htrace/core/Span.java
index e63d414..33908db 100644
--- a/htrace-core4/src/main/java/org/apache/htrace/core/Span.java
+++ b/htrace-core4/src/main/java/org/apache/htrace/core/Span.java
@@ -17,6 +17,7 @@
 package org.apache.htrace.core;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -170,8 +171,11 @@ public interface Span {
       Map<String, String> traceInfoMap = span.getKVAnnotations();
       if (!traceInfoMap.isEmpty()) {
         jgen.writeObjectFieldStart("n");
-        for (Map.Entry<String, String> e : traceInfoMap.entrySet()) {
-          jgen.writeStringField(e.getKey(), e.getValue());
+        String[] keys = traceInfoMap.keySet().
+            toArray(new String[traceInfoMap.size()]);
+        Arrays.sort(keys);
+        for (String key : keys) {
+          jgen.writeStringField(key, traceInfoMap.get(key));
         }
         jgen.writeEndObject();
       }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-core4/src/test/java/org/apache/htrace/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/htrace-core4/src/test/java/org/apache/htrace/util/TestUtil.java 
b/htrace-core4/src/test/java/org/apache/htrace/util/TestUtil.java
index 7cb4aed..0869ca0 100644
--- a/htrace-core4/src/test/java/org/apache/htrace/util/TestUtil.java
+++ b/htrace-core4/src/test/java/org/apache/htrace/util/TestUtil.java
@@ -16,8 +16,18 @@
  */
 package org.apache.htrace.util;
 
+import org.apache.htrace.core.MilliSpan;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanId;
+import org.apache.htrace.core.TimelineAnnotation;
+
 import java.io.File;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -88,4 +98,69 @@ public class TestUtil {
       Thread.sleep(periodMs);
     }
   }
+
+  private static long nonZeroRandomLong(Random rand) {
+    long r = 0;
+    do {
+      r = rand.nextLong();
+    } while (r == 0);
+    return r;
+  }
+
+  private static long positiveRandomLong(Random rand) {
+    long r = rand.nextLong();
+    if (r == Long.MIN_VALUE) {
+      // Math.abs can't handle this case
+      return Long.MAX_VALUE;
+    } else if (r > 0) {
+      return r;
+    } else {
+      return -r;
+    }
+  }
+
+  private static String randomString(Random rand) {
+    return new UUID(positiveRandomLong(rand),
+          positiveRandomLong(rand)).toString();
+  }
+
+  public static Span randomSpan(Random rand) {
+    MilliSpan.Builder builder = new MilliSpan.Builder();
+    builder.spanId(
+          new SpanId(nonZeroRandomLong(rand), nonZeroRandomLong(rand)));
+    builder.begin(positiveRandomLong(rand));
+    builder.end(positiveRandomLong(rand));
+    builder.description(randomString(rand));
+    builder.tracerId(randomString(rand));
+    int numParents = rand.nextInt(4);
+    SpanId[] parents = new SpanId[numParents];
+    for (int i = 0; i < numParents; i++) {
+      parents[i] =
+          new SpanId(nonZeroRandomLong(rand), nonZeroRandomLong(rand));
+    }
+    builder.parents(parents);
+    int numTraceInfos = rand.nextInt(4);
+    Map<String, String> traceInfo = new HashMap<String, String>(numTraceInfos);
+    for (int i = 0; i < numTraceInfos; i++) {
+      traceInfo.put(randomString(rand), randomString(rand));
+    }
+    builder.traceInfo(traceInfo);
+    int numTimelineAnnotations = rand.nextInt(4);
+    List<TimelineAnnotation> timeline =
+        new LinkedList<TimelineAnnotation>();
+    for (int i = 0; i < numTimelineAnnotations; i++) {
+      timeline.add(new TimelineAnnotation(positiveRandomLong(rand),
+            randomString(rand)));
+    }
+    builder.timeline(timeline);
+    return builder.build();
+  }
+
+  public static Span[] randomSpans(Random rand, int numSpans) {
+    Span[] spans = new Span[numSpans];
+    for (int i = 0; i < spans.length; i++) {
+      spans[i] = randomSpan(rand);
+    }
+    return spans;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/go/src/org/apache/htrace/client/client.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/client/client.go 
b/htrace-htraced/go/src/org/apache/htrace/client/client.go
index 2ac8a1e..dd3f8a3 100644
--- a/htrace-htraced/go/src/org/apache/htrace/client/client.go
+++ b/htrace-htraced/go/src/org/apache/htrace/client/client.go
@@ -114,32 +114,13 @@ func (hcl *Client) WriteSpans(req *common.WriteSpansReq) 
error {
 
 func (hcl *Client) writeSpansHttp(req *common.WriteSpansReq) error {
        var w bytes.Buffer
-       var err error
-       for i := range req.Spans {
-               var buf []byte
-               buf, err = json.Marshal(req.Spans[i])
-               if err != nil {
-                       return errors.New(fmt.Sprintf("Error serializing span: 
%s",
-                               err.Error()))
-               }
-               _, err = w.Write(buf)
-               if err != nil {
-                       return errors.New(fmt.Sprintf("Error writing span: %s",
-                               err.Error()))
-               }
-               _, err = w.Write([]byte{'\n'})
-               //err = io.WriteString(&w, "\n")
-               if err != nil {
-                       return errors.New(fmt.Sprintf("Error writing: %s",
-                               err.Error()))
-               }
-       }
-       customHeaders := make(map[string]string)
-       if req.DefaultTrid != "" {
-               customHeaders["htrace-trid"] = req.DefaultTrid
+       enc := json.NewEncoder(&w)
+       err := enc.Encode(req)
+       if err != nil {
+               return errors.New(fmt.Sprintf("Error serializing span: %s",
+                       err.Error()))
        }
-       _, _, err = hcl.makeRestRequest("POST", "writeSpans",
-               &w, customHeaders)
+       _, _, err = hcl.makeRestRequest("POST", "writeSpans", &w)
        if err != nil {
                return err
        }
@@ -182,24 +163,19 @@ func (hcl *Client) Query(query *common.Query) 
([]common.Span, error) {
        return spans, nil
 }
 
-var EMPTY = make(map[string]string)
-
 func (hcl *Client) makeGetRequest(reqName string) ([]byte, int, error) {
-       return hcl.makeRestRequest("GET", reqName, nil, EMPTY)
+       return hcl.makeRestRequest("GET", reqName, nil)
 }
 
 // Make a general JSON REST request.
 // Returns the request body, the response code, and the error.
 // Note: if the response code is non-zero, the error will also be non-zero.
-func (hcl *Client) makeRestRequest(reqType string, reqName string, reqBody 
io.Reader,
-       customHeaders map[string]string) ([]byte, int, error) {
+func (hcl *Client) makeRestRequest(reqType string, reqName string,
+       reqBody io.Reader) ([]byte, int, error) {
        url := fmt.Sprintf("http://%s/%s";,
                hcl.restAddr, reqName)
        req, err := http.NewRequest(reqType, url, reqBody)
        req.Header.Set("Content-Type", "application/json")
-       for k, v := range customHeaders {
-               req.Header.Set(k, v)
-       }
        client := &http.Client{}
        resp, err := client.Do(req)
        if err != nil {

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/go/src/org/apache/htrace/common/rest.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rest.go 
b/htrace-htraced/go/src/org/apache/htrace/common/rest.go
deleted file mode 100644
index b367ed1..0000000
--- a/htrace-htraced/go/src/org/apache/htrace/common/rest.go
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package common
-
-// Info returned by /server/info
-type ServerInfo struct {
-       // The server release version.
-       ReleaseVersion string
-
-       // The git hash that this software was built with.
-       GitVersion string
-}
-
-// Info returned by /server/stats
-type ServerStats struct {
-       Shards []ShardStats
-}
-
-type ShardStats struct {
-       Path string
-
-       // The approximate number of spans present in this shard.  This may be 
an
-       // underestimate.
-       ApproxNumSpans uint64
-
-       // leveldb.stats information
-       LevelDbStats string
-}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go 
b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
index 28521a5..9c7bfad 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go
@@ -38,14 +38,39 @@ const MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024
 
 // A request to write spans to htraced.
 type WriteSpansReq struct {
-       DefaultTrid string
+       DefaultTrid string `json:",omitempty"`
        Spans       []*Span
 }
 
+// Info returned by /server/info
+type ServerInfo struct {
+       // The server release version.
+       ReleaseVersion string
+
+       // The git hash that this software was built with.
+       GitVersion string
+}
+
 // A response to a WriteSpansReq
 type WriteSpansResp struct {
 }
 
+// Info returned by /server/stats
+type ServerStats struct {
+       Shards []ShardStats
+}
+
+type ShardStats struct {
+       Path string
+
+       // The approximate number of spans present in this shard.  This may be 
an
+       // underestimate.
+       ApproxNumSpans uint64
+
+       // leveldb.stats information
+       LevelDbStats string
+}
+
 // The header which is sent over the wire for HRPC
 type HrpcRequestHeader struct {
        Magic    uint32

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/go/src/org/apache/htrace/common/span_test.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/common/span_test.go 
b/htrace-htraced/go/src/org/apache/htrace/common/span_test.go
index 9de7cee..7fb128d 100644
--- a/htrace-htraced/go/src/org/apache/htrace/common/span_test.go
+++ b/htrace-htraced/go/src/org/apache/htrace/common/span_test.go
@@ -92,7 +92,7 @@ func TestSpanMsgPack(t *testing.T) {
                        End:         5678,
                        Description: "getFileDescriptors",
                        Parents:     []SpanId{},
-                       TracerId:   "testTracerId",
+                       TracerId:    "testTracerId",
                }}
        mh := new(codec.MsgpackHandle)
        mh.WriteExt = true

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/go/src/org/apache/htrace/conf/config.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config.go 
b/htrace-htraced/go/src/org/apache/htrace/conf/config.go
index b8f6c84..76af7a6 100644
--- a/htrace-htraced/go/src/org/apache/htrace/conf/config.go
+++ b/htrace-htraced/go/src/org/apache/htrace/conf/config.go
@@ -69,7 +69,7 @@ type Builder struct {
 }
 
 func getDefaultHTracedConfDir() string {
-       return PATH_SEP + "etc" + PATH_SEP + "htraced" + PATH_SEP + "conf";
+       return PATH_SEP + "etc" + PATH_SEP + "htraced" + PATH_SEP + "conf"
 }
 
 func getHTracedConfDirs(dlog io.Writer) []string {
@@ -78,7 +78,7 @@ func getHTracedConfDirs(dlog io.Writer) []string {
        if len(paths) < 1 {
                def := getDefaultHTracedConfDir()
                io.WriteString(dlog, fmt.Sprintf("HTRACED_CONF_DIR defaulting 
to %s\n", def))
-               return []string{ def }
+               return []string{def}
        }
        io.WriteString(dlog, fmt.Sprintf("HTRACED_CONF_DIR=%s\n", confDir))
        return paths

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go 
b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
index 8bc0c64..b5549c5 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go
@@ -31,8 +31,8 @@ import (
        "org/apache/htrace/common"
        "org/apache/htrace/conf"
        "os"
-       "time"
        "strings"
+       "time"
 )
 
 var RELEASE_VERSION string
@@ -126,7 +126,7 @@ func main() {
        case serverInfo.FullCommand():
                os.Exit(printServerInfo(hcl))
        case serverStats.FullCommand():
-               if (*serverStatsJson) {
+               if *serverStatsJson {
                        os.Exit(printServerStatsJson(hcl))
                } else {
                        os.Exit(printServerStats(hcl))
@@ -195,7 +195,7 @@ func printServerStats(hcl *htrace.Client) int {
        }
        fmt.Printf("HTraced server stats:\n")
        fmt.Printf("%d leveldb shards.\n", len(stats.Shards))
-       for i := range(stats.Shards) {
+       for i := range stats.Shards {
                shard := stats.Shards[i]
                fmt.Printf("==== %s ===\n", shard.Path)
                fmt.Printf("Approximate number of spans: %d\n", 
shard.ApproxNumSpans)

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
index 0595d36..9fb9920 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go
@@ -953,19 +953,19 @@ func (store *dataStore) HandleQuery(query *common.Query) 
([]*common.Span, error)
 }
 
 func (store *dataStore) ServerStats() *common.ServerStats {
-       serverStats := common.ServerStats {
-               Shards : make([]common.ShardStats, len(store.shards)),
+       serverStats := common.ServerStats{
+               Shards: make([]common.ShardStats, len(store.shards)),
        }
-       for shardIdx := range(store.shards) {
+       for shardIdx := range store.shards {
                shard := store.shards[shardIdx]
                serverStats.Shards[shardIdx].Path = shard.path
-               r := levigo.Range {
-                       Start : append([]byte{SPAN_ID_INDEX_PREFIX},
+               r := levigo.Range{
+                       Start: append([]byte{SPAN_ID_INDEX_PREFIX},
                                common.INVALID_SPAN_ID.Val()...),
-                       Limit : append([]byte{SPAN_ID_INDEX_PREFIX + 1},
+                       Limit: append([]byte{SPAN_ID_INDEX_PREFIX + 1},
                                common.INVALID_SPAN_ID.Val()...),
                }
-               vals := shard.ldb.GetApproximateSizes([]levigo.Range { r })
+               vals := shard.ldb.GetApproximateSizes([]levigo.Range{r})
                serverStats.Shards[shardIdx].ApproxNumSpans = vals[0]
                serverStats.Shards[shardIdx].LevelDbStats =
                        shard.ldb.PropertyValue("leveldb.stats")

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
----------------------------------------------------------------------
diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go 
b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
index 97b2bca..cbfc508 100644
--- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
+++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go
@@ -193,37 +193,24 @@ func (hand *writeSpansHandler) ServeHTTP(w 
http.ResponseWriter, req *http.Reques
        } else {
                dec = json.NewDecoder(req.Body)
        }
-       spans := make([]*common.Span, 0, 32)
-       defaultTrid := req.Header.Get("htrace-trid")
-       for {
-               var span common.Span
-               err := dec.Decode(&span)
-               if err != nil {
-                       if err != io.EOF {
-                               writeError(hand.lg, w, http.StatusBadRequest,
-                                       fmt.Sprintf("Error parsing spans: %s", 
err.Error()))
-                               return
-                       }
-                       break
-               }
-               spanIdProblem := span.Id.FindProblem()
-               if spanIdProblem != "" {
-                       writeError(hand.lg, w, http.StatusBadRequest,
-                               fmt.Sprintf("Invalid span ID: %s", 
spanIdProblem))
-                       return
-               }
-               if span.TracerId == "" {
-                       span.TracerId = defaultTrid
-               }
-               spans = append(spans, &span)
+       var msg common.WriteSpansReq
+       err := dec.Decode(&msg)
+       if (err != nil) && (err != io.EOF) {
+               writeError(hand.lg, w, http.StatusBadRequest,
+                       fmt.Sprintf("Error parsing WriteSpansReq: %s", 
err.Error()))
+               return
        }
        hand.lg.Debugf("writeSpansHandler: received %d span(s).  defaultTrid = 
%s\n",
-               len(spans), defaultTrid)
-       for spanIdx := range spans {
+               len(msg.Spans), msg.DefaultTrid)
+       for spanIdx := range msg.Spans {
                if hand.lg.DebugEnabled() {
-                       hand.lg.Debugf("writing span %s\n", 
spans[spanIdx].ToJson())
+                       hand.lg.Debugf("writing span %s\n", 
msg.Spans[spanIdx].ToJson())
+               }
+               span := msg.Spans[spanIdx]
+               if span.TracerId == "" {
+                       span.TracerId = msg.DefaultTrid
                }
-               hand.store.WriteSpan(spans[spanIdx])
+               hand.store.WriteSpan(span)
        }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/pom.xml
----------------------------------------------------------------------
diff --git a/htrace-htraced/pom.xml b/htrace-htraced/pom.xml
index 88fd2fc..4478dc5 100644
--- a/htrace-htraced/pom.xml
+++ b/htrace-htraced/pom.xml
@@ -86,6 +86,10 @@ language governing permissions and limitations under the 
License. -->
                   <pattern>org.eclipse.jetty</pattern>
                   <shadedPattern>org.apache.htrace.shaded.jetty</shadedPattern>
                 </relocation>
+                <dependency>
+                  <pattern>org.msgpack</pattern>
+                  <shadedPattern>org.apache.htrace.msgpack</shadedPattern>
+                </dependency>
               </relocations>
             </configuration>
             <goals>
@@ -200,12 +204,11 @@ language governing permissions and limitations under the 
License. -->
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
-    <!-- htraced rest client deps. -->
-    <!--Is this too much? Pulls down jetty-http, jetty-server, jetty-io
-     This is new-style jetty client, jetty9 and jdk7 required.
-     It can do async but we will use it synchronously at first.
-     Has nice tutorial: 
http://www.eclipse.org/jetty/documentation/9.1.5.v20140505/http-client-api.html
-     -->
+    <dependency>
+      <groupId>org.msgpack</groupId>
+      <artifactId>msgpack-core</artifactId>
+      <version>0.7.0-p9</version>
+    </dependency>
     <dependency>
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-client</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/main/java/org/apache/htrace/impl/BufferManager.java
----------------------------------------------------------------------
diff --git 
a/htrace-htraced/src/main/java/org/apache/htrace/impl/BufferManager.java 
b/htrace-htraced/src/main/java/org/apache/htrace/impl/BufferManager.java
new file mode 100644
index 0000000..29816eb
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/BufferManager.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+
+import org.apache.htrace.core.Span;
+
+/**
+ * A buffer which contains span data and is able to send it over the network.
+ *
+ * BufferManager functions are not thread-safe.  You must rely on external
+ * synchronization to protect buffers from concurrent operations.
+ */
+interface BufferManager {
+  /**
+   * Write a span to this buffer.
+   *
+   * @param span            The span to write.
+   *
+   * @throws IOException    If the buffer doesn't have enough space to hold the
+   *                          new span.  We will not write a partial span to 
the
+   *                          buffer in this case.
+   */
+  void writeSpan(Span span) throws IOException;
+
+  /**
+   * Get the amount of content currently in the buffer.
+   */
+  int contentLength();
+
+  /**
+   * Get the number of spans currently buffered.
+   */
+  int getNumberOfSpans();
+
+  /**
+   * Prepare the buffers to be flushed.
+   */
+  void prepare() throws IOException;
+
+  /**
+   * Flush this buffer to htraced.
+   *
+   * This is a blocking operation which will not return until the buffer is
+   * completely flushed.
+   */
+  void flush() throws IOException;
+
+  /**
+   * Clear the data in this buffer.
+   */
+  void clear();
+
+  /**
+   * Closes the buffer manager and frees all resources.
+   */
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java
----------------------------------------------------------------------
diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java 
b/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java
new file mode 100644
index 0000000..cdd176f
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.htrace.core.HTraceConfiguration;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The configuration of the HTracedSpanReceiver.
+ *
+ * This class extracts all the relevant configuration information for
+ * HTracedSpanReceiver from the HTraceConfiguration.  It performs parsing and
+ * bounds-checking for the configuration keys.
+ *
+ * It is more efficient to store the configuration as final values in this
+ * structure than to access the HTraceConfiguration object directly.  This is
+ * especially true when the HTraceConfiguration object is a thin shim around a
+ * Hadoop Configuration object, which requires synchronization to access.
+ */
+class Conf {
+  private static final Log LOG = LogFactory.getLog(Conf.class);
+
+  /**
+   * Address of the htraced server.
+   */
+  final static String ADDRESS_KEY =
+      "htraced.receiver.address";
+
+  /**
+   * The minimum number of milliseconds to wait for a read or write
+   * operation on the network.
+   */
+  final static String IO_TIMEOUT_MS_KEY =
+      "htraced.receiver.io.timeout.ms";
+  final static int IO_TIMEOUT_MS_DEFAULT = 60000;
+  final static int IO_TIMEOUT_MS_MIN = 50;
+
+  /**
+   * The minimum number of milliseconds to wait for a network
+   * connection attempt.
+   */
+  final static String CONNECT_TIMEOUT_MS_KEY =
+      "htraced.receiver.connect.timeout.ms";
+  final static int CONNECT_TIMEOUT_MS_DEFAULT = 60000;
+  final static int CONNECT_TIMEOUT_MS_MIN = 50;
+
+  /**
+   * The minimum number of milliseconds to keep alive a connection when it's
+   * not in use.
+   */
+  final static String IDLE_TIMEOUT_MS_KEY =
+      "htraced.receiver.idle.timeout.ms";
+  final static int IDLE_TIMEOUT_MS_DEFAULT = 60000;
+  final static int IDLE_TIMEOUT_MS_MIN = 0;
+
+  /**
+   * Configure the retry times to use when an attempt to flush spans to
+   * htraced fails.  This is configured as a comma-separated list of delay
+   * times in milliseconds.  If the configured value is empty, no retries
+   * will be made.
+   */
+  final static String FLUSH_RETRY_DELAYS_KEY =
+      "htraced.flush.retry.delays.key";
+  final static String FLUSH_RETRY_DELAYS_DEFAULT =
+      "1000,30000";
+
+  /**
+   * The maximum length of time to go in between flush attempts.
+   * Once this time elapses, a flush will be triggered even if we don't
+   * have that many spans buffered.
+   */
+  final static String MAX_FLUSH_INTERVAL_MS_KEY =
+      "htraced.receiver.max.flush.interval.ms";
+  final static int MAX_FLUSH_INTERVAL_MS_DEFAULT = 60000;
+  final static int MAX_FLUSH_INTERVAL_MS_MIN = 10;
+
+  /**
+   * Whether or not to use msgpack for span serialization.
+   * If this key is false, JSON over REST will be used.
+   * If this key is true, msgpack over custom RPC will be used.
+   */
+  final static String PACKED_KEY =
+      "htraced.receiver.packed";
+  final static boolean PACKED_DEFAULT = true;
+
+  /**
+   * The size of the span buffers.
+   */
+  final static String BUFFER_SIZE_KEY =
+      "htraced.receiver.buffer.size";
+  final static int BUFFER_SIZE_DEFAULT = 48 * 1024 * 1024;
+  static int BUFFER_SIZE_MIN = 4 * 1024 * 1024;
+  // The maximum buffer size should not be longer than
+  // PackedBuffer.MAX_HRPC_BODY_LENGTH.
+  final static int BUFFER_SIZE_MAX = 63 * 1024 * 1024;
+
+  /**
+   * Set the fraction of the span buffer which needs to fill up before we
+   * will automatically trigger a flush.  This is a fraction, not a percentage.
+   * It is between 0 and 1.
+   */
+  final static String BUFFER_SEND_TRIGGER_FRACTION_KEY =
+      "htraced.receiver.buffer.send.trigger.fraction";
+  final static double BUFFER_SEND_TRIGGER_FRACTION_DEFAULT = 0.5;
+  final static double BUFFER_SEND_TRIGGER_FRACTION_MIN = 0.1;
+
+  /**
+   * The length of time which receiveSpan should wait for a free spot in a
+   * span buffer before giving up and dropping the span
+   */
+  final static String SPAN_DROP_TIMEOUT_MS_KEY =
+      "htraced.max.buffer.full.retry.ms.key";
+  final static int SPAN_DROP_TIMEOUT_MS_DEFAULT = 5000;
+
+  /**
+   * The length of time we should wait between displaying log messages on the
+   * rate-limited loggers.
+   */
+  final static String ERROR_LOG_PERIOD_MS_KEY =
+      "htraced.error.log.period.ms";
+  final static long ERROR_LOG_PERIOD_MS_DEFAULT = 30000L;
+
+  @JsonProperty("ioTimeoutMs")
+  final int ioTimeoutMs;
+
+  @JsonProperty("connectTimeoutMs")
+  final int connectTimeoutMs;
+
+  @JsonProperty("idleTimeoutMs")
+  final int idleTimeoutMs;
+
+  @JsonProperty("flushRetryDelays")
+  final int[] flushRetryDelays;
+
+  @JsonProperty("maxFlushIntervalMs")
+  final int maxFlushIntervalMs;
+
+  @JsonProperty("packed")
+  final boolean packed;
+
+  @JsonProperty("bufferSize")
+  final int bufferSize;
+
+  @JsonProperty("spanDropTimeoutMs")
+  final int spanDropTimeoutMs;
+
+  @JsonProperty("errorLogPeriodMs")
+  final long errorLogPeriodMs;
+
+  @JsonProperty("triggerSize")
+  final int triggerSize;
+
+  @JsonProperty("endpointStr")
+  final String endpointStr;
+
+  @JsonProperty("endpoint")
+  final InetSocketAddress endpoint;
+
+  private static int getBoundedInt(final HTraceConfiguration conf,
+        String key, int defaultValue, int minValue, int maxValue) {
+    int val = conf.getInt(key, defaultValue);
+    if (val < minValue) {
+      LOG.warn("Can't set " + key + " to " + val + ".  Using minimum value " +
+          "of " + minValue + " instead.");
+      return minValue;
+    } else if (val > maxValue) {
+      LOG.warn("Can't set " + key + " to " + val + ".  Using maximum value " +
+          "of " + maxValue + " instead.");
+      return maxValue;
+    }
+    return val;
+  }
+
+  private static long getBoundedLong(final HTraceConfiguration conf,
+        String key, long defaultValue, long minValue, long maxValue) {
+    String strVal = conf.get(key, Long.toString(defaultValue));
+    long val = 0;
+    try {
+      val = Long.parseLong(strVal);
+    } catch (NumberFormatException nfe) {
+      throw new IllegalArgumentException("Bad value for '" + key +
+        "': should be long");
+    }
+    if (val < minValue) {
+      LOG.warn("Can't set " + key + " to " + val + ".  Using minimum value " +
+          "of " + minValue + " instead.");
+      return minValue;
+    } else if (val > maxValue) {
+      LOG.warn("Can't set " + key + " to " + val + ".  Using maximum value " +
+          "of " + maxValue + " instead.");
+      return maxValue;
+    }
+    return val;
+  }
+
+  private static double getBoundedDouble(final HTraceConfiguration conf,
+        String key, double defaultValue, double minValue, double maxValue) {
+    String strVal = conf.get(key, Double.toString(defaultValue));
+    double val = 0;
+    try {
+      val = Double.parseDouble(strVal);
+    } catch (NumberFormatException nfe) {
+      throw new IllegalArgumentException("Bad value for '" + key +
+        "': should be double");
+    }
+    if (val < minValue) {
+      LOG.warn("Can't set " + key + " to " + val + ".  Using minimum value " +
+          "of " + minValue + " instead.");
+      return minValue;
+    }
+    if (val > maxValue) {
+      LOG.warn("Can't set " + key + " to " + val + ".  Using maximum value " +
+          "of " + maxValue + " instead.");
+      return maxValue;
+    }
+    return val;
+  }
+
+  private static int parseColonPort(String portStr) throws IOException {
+    int colonPosition = portStr.indexOf(':');
+    if (colonPosition != 0) {
+      throw new IOException("Invalid port string " + portStr);
+    }
+    int port = Integer.parseInt(portStr.substring(1), 10);
+    if ((port < 0) || (port > 65535)) {
+      throw new IOException("Invalid port number " + port);
+    }
+    return port;
+  }
+
+  /**
+   * Parse a hostname:port or ip:port pair.
+   *
+   * @param str       The string to parse.
+   * @return          The socket address.
+   */
+  InetSocketAddress parseHostPortPair(String str) throws IOException {
+    str = str.trim();
+    if (str.isEmpty()) {
+      throw new IOException("No hostname:port pair given.");
+    }
+    int bracketBegin = str.indexOf('[');
+    if (bracketBegin == 0) {
+      // Parse an ipv6-style address enclosed in square brackets.
+      int bracketEnd = str.indexOf(']');
+      if (bracketEnd < 0) {
+        throw new IOException("Found left bracket, but no corresponding " +
+            "right bracket, in " + str);
+      }
+      String host = str.substring(bracketBegin + 1, bracketEnd);
+      int port = parseColonPort(str.substring(bracketEnd + 1));
+      return InetSocketAddress.createUnresolved(host, port);
+    } else if (bracketBegin > 0) {
+        throw new IOException("Found a left bracket that wasn't at the " +
+          "start of the host:port pair in " + str);
+    } else {
+      int colon = str.indexOf(':');
+      if (colon <= 0) {
+        throw new IOException("No port component found in " + str);
+      }
+      String host = str.substring(0, colon);
+      int port = parseColonPort(str.substring(colon));
+      return InetSocketAddress.createUnresolved(host, port);
+    }
+  }
+
+  static int[] getIntArray(String arrayStr) {
+    String[] array = arrayStr.split(",");
+    int nonEmptyEntries = 0;
+    for (String str : array) {
+      if (!str.trim().isEmpty()) {
+        nonEmptyEntries++;
+      }
+    }
+    int[] ret = new int[nonEmptyEntries];
+    int i = 0;
+    for (String str : array) {
+      if (!str.trim().isEmpty()) {
+        ret[i++] = Integer.parseInt(str);
+      }
+    }
+    return ret;
+  }
+
+  Conf(HTraceConfiguration conf) throws IOException {
+    this.ioTimeoutMs = getBoundedInt(conf, IO_TIMEOUT_MS_KEY,
+              IO_TIMEOUT_MS_DEFAULT,
+              IO_TIMEOUT_MS_MIN, Integer.MAX_VALUE);
+    this.connectTimeoutMs = getBoundedInt(conf, CONNECT_TIMEOUT_MS_KEY,
+              CONNECT_TIMEOUT_MS_DEFAULT,
+              CONNECT_TIMEOUT_MS_MIN, Integer.MAX_VALUE);
+    this.idleTimeoutMs = getBoundedInt(conf, IDLE_TIMEOUT_MS_KEY,
+              IDLE_TIMEOUT_MS_DEFAULT,
+              IDLE_TIMEOUT_MS_MIN, Integer.MAX_VALUE);
+    this.flushRetryDelays = getIntArray(conf.get(FLUSH_RETRY_DELAYS_KEY,
+              FLUSH_RETRY_DELAYS_DEFAULT));
+    this.maxFlushIntervalMs = getBoundedInt(conf, MAX_FLUSH_INTERVAL_MS_KEY,
+              MAX_FLUSH_INTERVAL_MS_DEFAULT,
+              MAX_FLUSH_INTERVAL_MS_MIN, Integer.MAX_VALUE);
+    this.packed = conf.getBoolean(PACKED_KEY, PACKED_DEFAULT);
+    this.bufferSize = getBoundedInt(conf, BUFFER_SIZE_KEY,
+              BUFFER_SIZE_DEFAULT,
+              BUFFER_SIZE_MIN, BUFFER_SIZE_MAX);
+    double triggerFraction = getBoundedDouble(conf,
+              BUFFER_SEND_TRIGGER_FRACTION_KEY,
+              BUFFER_SEND_TRIGGER_FRACTION_DEFAULT,
+              BUFFER_SEND_TRIGGER_FRACTION_MIN, 1.0);
+    this.spanDropTimeoutMs = conf.getInt(SPAN_DROP_TIMEOUT_MS_KEY,
+        SPAN_DROP_TIMEOUT_MS_DEFAULT);
+    this.errorLogPeriodMs = getBoundedLong(conf, ERROR_LOG_PERIOD_MS_KEY,
+        ERROR_LOG_PERIOD_MS_DEFAULT, 0, Long.MAX_VALUE);
+    this.triggerSize = (int)(this.bufferSize * triggerFraction);
+    try {
+      this.endpointStr = conf.get(ADDRESS_KEY, "");
+      this.endpoint = parseHostPortPair(endpointStr);
+    } catch (IOException e) {
+      throw new IOException("Error reading " + ADDRESS_KEY + ": " +
+          e.getMessage());
+    }
+  }
+
+  @Override
+  public String toString() {
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+    try {
+      return mapper.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
----------------------------------------------------------------------
diff --git 
a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java 
b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
deleted file mode 100644
index 643bbd5..0000000
--- 
a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
+++ /dev/null
@@ -1,431 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.htrace.impl;
-
-import java.io.IOException;
-import java.net.URL;
-import java.util.ArrayDeque;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.htrace.core.HTraceConfiguration;
-import org.apache.htrace.core.Span;
-import org.apache.htrace.core.SpanReceiver;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.api.ContentResponse;
-import org.eclipse.jetty.client.api.Request;
-import org.eclipse.jetty.client.util.StringContentProvider;
-import org.eclipse.jetty.http.HttpField;
-import org.eclipse.jetty.http.HttpHeader;
-import org.eclipse.jetty.http.HttpMethod;
-import org.eclipse.jetty.http.HttpStatus;
-
-/**
- * A {@link SpanReceiver} that passes Spans to htraced via REST. 
Implementation minimizes
- * dependencies and aims for small footprint since this client will be the 
guest of another,
- * the process traced.
- *
- * <p>Logs via commons-logging. Uses jetty client. Jetty has its own logging. 
To connect, see
- * jetty logging to commons-logging and see 
https://issues.apache.org/jira/browse/HADOOP-6807
- * and 
http://blogs.bytecode.com.au/glen/2005/06/21/getting-your-logging-working-in-jetty.html.
- *
- * <p>This client depends on the REST defined in <code>rest.go</code> in the 
htraced REST server.
- *
- * <p>Create an instance by doing:
- * <code>SpanReceiver receiver = new HTracedRESTReceiver(conf);</code> where 
conf is an instance
- * of {@link HTraceConfiguration}. See the public keys defined below for what 
we will look for in
- * the configuration.  For example, set {@link #HTRACED_REST_URL_KEY} if
- * <code>htraced</code> is in a non-standard location. Then call
- * <code>receiver.receiveSpan(Span);</code> to send spans to an htraced
- * instance. This method returns immediately. It sends the spans in background.
- *
- * <p>TODO: Shading works?
- * TODO: Add lazy start; don't start background thread till a span gets queued.
- * TODO: Add some metrics; how many times we've run, how many spans and what 
size we've sent.
- */
-public class HTracedRESTReceiver extends SpanReceiver {
-  private static final Log LOG = LogFactory.getLog(HTracedRESTReceiver.class);
-
-  /**
-   * The HttpClient to use for this receiver.
-   */
-  private final HttpClient httpClient;
-
-  /**
-   * The maximum number of spans to buffer.
-   */
-  private final int capacity;
-
-  /**
-   * REST URL to use writing Spans.
-   */
-  private final String url;
-
-  /**
-   * The maximum number of spans to send in a single PUT.
-   */
-  private final int maxToSendAtATime;
-
-  /**
-   * Runs background task to do the REST PUT.
-   */
-  private final PostSpans postSpans;
-
-  /**
-   * Thread for postSpans
-   */
-  private final Thread postSpansThread;
-
-  /**
-   * The connection timeout in milliseconds.
-   */
-  public static final String CLIENT_CONNECT_TIMEOUT_MS_KEY = 
"client.connect.timeout.ms";
-  private static final int CLIENT_CONNECT_TIMEOUT_MS_DEFAULT = 30000;
-
-  /**
-   * The idle timeout in milliseconds.
-   */
-  public static final String CLIENT_IDLE_TIMEOUT_MS_KEY = 
"client.idle.timeout.ms";
-  private static final int CLIENT_IDLE_TIMEOUT_MS_DEFAULT = 120000;
-
-  /**
-   * URL of the htraced REST server we are to talk to.
-   */
-  public static final String HTRACED_REST_URL_KEY = "htraced.rest.url";
-  private static final String HTRACED_REST_URL_DEFAULT = 
"http://localhost:9095/";;
-
-  /**
-   * Maximum size of the queue to accumulate spans in.
-   * Cleared by the background thread that does the REST POST to htraced.
-   */
-  public static final String CLIENT_REST_QUEUE_CAPACITY_KEY = 
"client.rest.queue.capacity";
-  private static final int CLIENT_REST_QUEUE_CAPACITY_DEFAULT = 1000000;
-
-  /**
-   * Period at which the background thread that does the REST POST to htraced 
in ms.
-   */
-  public static final String CLIENT_REST_PERIOD_MS_KEY = 
"client.rest.period.ms";
-  private static final int CLIENT_REST_PERIOD_MS_DEFAULT = 30000;
-
-  /**
-   * Maximum spans to post to htraced at a time.
-   */
-  public static final String CLIENT_REST_MAX_SPANS_AT_A_TIME_KEY =
-    "client.rest.batch.size";
-  private static final int CLIENT_REST_MAX_SPANS_AT_A_TIME_DEFAULT = 100;
-
-  /**
-   * Lock protecting the PostSpans data.
-   */
-  private ReentrantLock lock = new ReentrantLock();
-
-  /**
-   * Condition variable used to wake up the PostSpans thread.
-   */
-  private Condition cond = lock.newCondition();
-
-  /**
-   * True if we should shut down.
-   * Protected by the lock.
-   */
-  private boolean shutdown = false;
-
-  /**
-   * Simple bounded queue to hold spans between periodic runs of the 
httpclient.
-   * Protected by the lock.
-   */
-  private final ArrayDeque<Span> spans;
-
-  /**
-   * Keep last time we logged we were at capacity; used to prevent flooding of 
logs with
-   * "at capacity" messages.
-   */
-  private AtomicLong lastAtCapacityWarningLog = new AtomicLong(0L);
-
-  /**
-   * True if we should flush as soon as possible.  Protected by the lock.
-   */
-  private boolean mustStartFlush;
-
-  /**
-   * Create an HttpClient instance.
-   *
-   * @param connTimeout         The timeout to use for connecting.
-   * @param idleTimeout         The idle timeout to use.
-   */
-  HttpClient createHttpClient(long connTimeout, long idleTimeout) {
-    HttpClient httpClient = new HttpClient();
-    httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT,
-      this.getClass().getSimpleName()));
-    httpClient.setConnectTimeout(connTimeout);
-    httpClient.setIdleTimeout(idleTimeout);
-    return httpClient;
-  }
-
-  /**
-   * Constructor.
-   * You must call {@link #close()} post construction when done.
-   * @param conf
-   * @throws Exception
-   */
-  public HTracedRESTReceiver(final HTraceConfiguration conf) throws Exception {
-    int connTimeout = conf.getInt(CLIENT_CONNECT_TIMEOUT_MS_KEY,
-                                  CLIENT_CONNECT_TIMEOUT_MS_DEFAULT);
-    int idleTimeout = conf.getInt(CLIENT_IDLE_TIMEOUT_MS_KEY,
-                                  CLIENT_IDLE_TIMEOUT_MS_DEFAULT);
-    this.httpClient = createHttpClient(connTimeout, idleTimeout);
-    this.capacity = conf.getInt(CLIENT_REST_QUEUE_CAPACITY_KEY, 
CLIENT_REST_QUEUE_CAPACITY_DEFAULT);
-    this.spans = new ArrayDeque<Span>(capacity);
-    // Build up the writeSpans URL.
-    URL restServer = new URL(conf.get(HTRACED_REST_URL_KEY, 
HTRACED_REST_URL_DEFAULT));
-    URL url = new URL(restServer.getProtocol(), restServer.getHost(), 
restServer.getPort(), "/writeSpans");
-    this.url = url.toString();
-    // Period at which we run the background thread that does the REST POST to 
htraced.
-    int periodInMs = conf.getInt(CLIENT_REST_PERIOD_MS_KEY, 
CLIENT_REST_PERIOD_MS_DEFAULT);
-    // Maximum spans to send in one go
-    this.maxToSendAtATime =
-      conf.getInt(CLIENT_REST_MAX_SPANS_AT_A_TIME_KEY, 
CLIENT_REST_MAX_SPANS_AT_A_TIME_DEFAULT);
-    // Start up the httpclient.
-    this.httpClient.start();
-    // Start the background thread.
-    this.postSpans = new PostSpans(periodInMs);
-    this.postSpansThread = new Thread(postSpans);
-    this.postSpansThread.setDaemon(true);
-    this.postSpansThread.setName("PostSpans");
-    this.postSpansThread.start();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Created new HTracedRESTReceiver with connTimeout=" +
-            connTimeout + ", idleTimeout = " + idleTimeout + ", capacity=" +
-            capacity + ", url=" + url +  ", periodInMs=" + periodInMs +
-            ", maxToSendAtATime=" + maxToSendAtATime);
-    }
-  }
-
-  /**
-   * POST spans runnable.
-   * Run on a period. Services the passed in queue taking spans and sending 
them to traced via http.
-   */
-  private class PostSpans implements Runnable {
-    private final long periodInNs;
-    private final ArrayDeque<Span> spanBuf;
-
-    private PostSpans(long periodInMs) {
-      this.periodInNs = TimeUnit.NANOSECONDS.
-          convert(periodInMs, TimeUnit.MILLISECONDS);
-      this.spanBuf = new ArrayDeque<Span>(maxToSendAtATime);
-    }
-
-    /**
-     * The span sending thread.
-     *
-     * We send a batch of spans for one of two reasons: there are already
-     * maxToSendAtATime spans in the buffer, or the client.rest.period.ms
-     * has elapsed.  The idea is that we want to strike a balance between
-     * sending a lot of spans at a time, for efficiency purposes, and
-     * making sure that we don't buffer spans locally for too long.
-     *
-     * The longer we buffer spans locally, the longer we will have to wait
-     * to see the results of our client operations in the GUI, and the higher
-     * the risk of losing them if the client crashes.
-     */
-    @Override
-    public void run() {
-      long waitNs;
-      try {
-        waitNs = periodInNs;
-        while (true) {
-          lock.lock();
-          try {
-            if (shutdown) {
-              if (spans.isEmpty()) {
-                LOG.debug("Shutting down PostSpans thread...");
-                break;
-              }
-            } else {
-              try {
-                waitNs = cond.awaitNanos(waitNs);
-                if (mustStartFlush) {
-                  waitNs = 0;
-                  mustStartFlush = false;
-                }
-              } catch (InterruptedException e) {
-                LOG.info("Got InterruptedException");
-                waitNs = 0;
-              }
-            }
-            if ((spans.size() > maxToSendAtATime) || (waitNs <= 0) ||
-                    shutdown) {
-              loadSpanBuf();
-              waitNs = periodInNs;
-            }
-          } finally {
-            lock.unlock();
-          }
-          // Once the lock has been safely released, we can do some network
-          // I/O without blocking the client process.
-          if (!spanBuf.isEmpty()) {
-            sendSpans();
-            spanBuf.clear();
-          }
-        }
-      } finally {
-        if (httpClient != null) {
-          try {
-            httpClient.stop();
-          } catch (Exception e) {
-            LOG.error("Error shutting down httpClient", e);
-          }
-        }
-        spans.clear();
-      }
-    }
-
-    private void loadSpanBuf() {
-      for (int loaded = 0; loaded < maxToSendAtATime; loaded++) {
-        Span span = spans.pollFirst();
-        if (span == null) {
-          return;
-        }
-        spanBuf.add(span);
-      }
-    }
-
-    private void sendSpans() {
-      try {
-        Request request = httpClient.newRequest(url).method(HttpMethod.POST);
-        request.header(HttpHeader.CONTENT_TYPE, "application/json");
-        StringBuilder bld = new StringBuilder();
-        for (Span span : spanBuf) {
-          bld.append(span.toJson());
-        }
-        request.content(new StringContentProvider(bld.toString()));
-        ContentResponse response = request.send();
-        if (response.getStatus() == HttpStatus.OK_200) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("POSTED " + spanBuf.size() + " spans");
-          }
-        } else {
-          LOG.error("Status: " + response.getStatus());
-          LOG.error(response.getHeaders());
-          LOG.error(response.getContentAsString());
-        }
-      } catch (InterruptedException e) {
-        LOG.error(e);
-      } catch (TimeoutException e) {
-        LOG.error(e);
-      } catch (ExecutionException e) {
-        LOG.error(e);
-      }
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    LOG.debug("Closing HTracedRESTReceiver(" + url + ").");
-    lock.lock();
-    try {
-      this.shutdown = true;
-      cond.signal();
-    } finally {
-      lock.unlock();
-    }
-    try {
-      postSpansThread.join(120000);
-      if (postSpansThread.isAlive()) {
-        LOG.error("Timed out without closing HTracedRESTReceiver(" +
-                  url + ").");
-      } else {
-        LOG.debug("Closed HTracedRESTReceiver(" + url + ").");
-      }
-    } catch (InterruptedException e) {
-      LOG.error("Interrupted while joining postSpans", e);
-    }
-  }
-
-  /**
-   * Start flushing the buffered spans.
-   *
-   * Note that even after calling this function, you will still have to wait
-   * for the flush to finish happening.  This function just starts the flush;
-   * it does not block until it has completed.  You also do not get
-   * "read-after-write consistency" with htraced... the spans that are
-   * written may be buffered for a short period of time prior to being
-   * readable.  This is not a problem for production use (since htraced is not
-   * a database), but it means that most unit tests will need a loop in their
-   * "can I read what I wrote" tests.
-   */
-  void startFlushing() {
-    LOG.info("Triggering HTracedRESTReceiver flush.");
-    lock.lock();
-    try {
-      mustStartFlush = true;
-      cond.signal();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  private static long WARN_TIMEOUT_MS = 300000;
-
-  @Override
-  public void receiveSpan(Span span) {
-    boolean added = false;
-    lock.lock();
-    try {
-      if (shutdown) {
-        LOG.trace("receiveSpan(span=" + span + "): HTracedRESTReceiver " +
-            "is already shut down.");
-        return;
-      }
-      if (spans.size() < capacity) {
-        spans.add(span);
-        added = true;
-        if (spans.size() >= maxToSendAtATime) {
-          cond.signal();
-        }
-      } else {
-        cond.signal();
-      }
-    } finally {
-      lock.unlock();
-    }
-    if (!added) {
-      long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(),
-          TimeUnit.NANOSECONDS);
-      long last = lastAtCapacityWarningLog.get();
-      if (now - last > WARN_TIMEOUT_MS) {
-        // Only log every 5 minutes. Any more than this for a guest process
-        // is obnoxious.
-        if (lastAtCapacityWarningLog.compareAndSet(last, now)) {
-          // If the atomic-compare-and-set succeeds, we should log.  Otherwise,
-          // we should assume another thread already logged and bumped up the
-          // value of lastAtCapacityWarning sometime between our get and the
-          // "if" statement.
-          LOG.warn("There are too many HTrace spans to buffer!  We have " +
-              "already buffered " + capacity + " spans.  Dropping spans.");
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedSpanReceiver.java
----------------------------------------------------------------------
diff --git 
a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedSpanReceiver.java 
b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedSpanReceiver.java
new file mode 100644
index 0000000..f5f493c
--- /dev/null
+++ 
b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedSpanReceiver.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.htrace.core.HTraceConfiguration;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanReceiver;
+
+/**
+ * The SpanReceiver which sends spans to htraced.
+ *
+ * HTracedSpanReceiver sends trace spans out to the htraced daemon, where they
+ * are stored and indexed.  It supports two forms of RPC: a JSON/HTTP form, and
+ * an HRPC/msgpack form.  We will use the msgpack form when
+ * htraced.receiver.packed is set to true.
+ *
+ * HTraced buffers are several megabytes in size, and we reuse them to avoid
+ * creating extra garbage on the heap.  They are flushed whenever a timeout
+ * elapses, or when they get more than a configurable percent full.  We 
allocate
+ * two buffers so that we can continue filling one buffer while the other is
+ * being sent over the wire.  The buffers store serialized spans.  This is
+ * better than storing references to span objects because it minimzes the 
amount
+ * of pointers we have to follow during a GC.  Buffers are managed by instances
+ * of BufferManager.
+ */
+public class HTracedSpanReceiver extends SpanReceiver {
+  private static final Log LOG = LogFactory.getLog(HTracedSpanReceiver.class);
+
+  private final static int MAX_CLOSING_WAIT_MS = 120000;
+
+  private final FaultInjector faultInjector;
+
+  private final Conf conf;
+
+  private final ReentrantLock lock = new ReentrantLock();
+
+  private final Condition wakePostSpansThread = lock.newCondition();
+
+  private final BufferManager bufferManager[] = new BufferManager[2];
+
+  private final RateLimitedLogger flushErrorLog;
+
+  private final RateLimitedLogger spanDropLog;
+
+  private final PostSpansThread thread;
+
+  private boolean shutdown = false;
+
+  private int activeBuf = 0;
+
+  private int flushingBuf = -1;
+
+  private long lastBufferClearedTimeMs = 0;
+
+  static class FaultInjector {
+    static FaultInjector NO_OP = new FaultInjector();
+    public void handleContentLengthTrigger(int len) { }
+    public void handleThreadStart() throws Exception { }
+    public void handleFlush() throws IOException { }
+  }
+
+  public HTracedSpanReceiver(HTraceConfiguration c) throws Exception {
+    this(c, FaultInjector.NO_OP);
+  }
+
+  HTracedSpanReceiver(HTraceConfiguration c,
+      FaultInjector faultInjector) throws Exception {
+    this.faultInjector = faultInjector;
+    this.conf = new Conf(c);
+    if (this.conf.packed) {
+      for (int i = 0; i < bufferManager.length; i++) {
+        bufferManager[i] = new PackedBufferManager(conf);
+      }
+    } else {
+      for (int i = 0; i < bufferManager.length; i++) {
+        bufferManager[i] = new RestBufferManager(conf);
+      }
+    }
+    this.flushErrorLog = new RateLimitedLogger(LOG, conf.errorLogPeriodMs);
+    this.spanDropLog = new RateLimitedLogger(LOG, conf.errorLogPeriodMs);
+    this.thread = new PostSpansThread();
+    LOG.debug("Created new HTracedSpanReceiver with " + conf.toString());
+  }
+
+  @Override
+  public void receiveSpan(Span span) {
+    long startTimeMs = 0;
+    int numTries = 1;
+    while (true) {
+      lock.lock();
+      try {
+        if (shutdown) {
+          LOG.info("Unable to add span because HTracedSpanReceiver is shutting 
down.");
+          return;
+        }
+        Throwable exc = null;
+        try {
+          bufferManager[activeBuf].writeSpan(span);
+          int contentLength = bufferManager[activeBuf].contentLength();
+          if (contentLength > conf.triggerSize) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Triggering buffer #" + activeBuf + " flush because" +
+                  " buffer contains " + contentLength + " bytes, and " +
+                  "triggerSize is " + conf.triggerSize);
+            }
+            faultInjector.handleContentLengthTrigger(contentLength);
+            wakePostSpansThread.signal();
+          }
+          return;
+        } catch (Exception e) {
+          exc = e;
+        } catch (Error e) {
+          exc = e;
+        }
+        if (startTimeMs == 0) {
+          startTimeMs = TimeUtil.nowMs();
+        }
+        long deltaMs = TimeUtil.deltaMs(startTimeMs, TimeUtil.nowMs());
+        if (deltaMs > conf.spanDropTimeoutMs) {
+          spanDropLog.error("Dropping a span after unsuccessfully " +
+              "attempting to add it for " + deltaMs + " ms.  There is not " +
+              "enough buffer space. Please increase " + Conf.BUFFER_SIZE_KEY +
+              " or decrease the rate of spans being generated.");
+          return;
+        } else if (LOG.isDebugEnabled()) {
+          LOG.debug("Unable to write span to buffer #" + activeBuf +
+              " after " + numTries + " attempt(s) and " + deltaMs + " ms" +
+              ".  Buffer already has " +
+                  bufferManager[activeBuf].getNumberOfSpans() + " spans.",
+              exc);
+        }
+        numTries++;
+      } finally {
+        lock.unlock();
+      }
+      try {
+        Thread.sleep(conf.spanDropTimeoutMs / 3);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    lock.lock();
+    try {
+      shutdown = true;
+      wakePostSpansThread.signal();
+    } finally {
+      lock.unlock();
+    }
+    try {
+      thread.join(MAX_CLOSING_WAIT_MS);
+    } catch (InterruptedException e) {
+      LOG.error("HTracedSpanReceiver#close was interrupted", e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private class PostSpansThread extends Thread {
+    PostSpansThread() {
+      this.setDaemon(true);
+      this.setName("PostSpans");
+      this.start();
+    }
+
+    private boolean shouldWaitForCond(long timeSinceLastClearedMs) {
+      if (shutdown) {
+        // If we're shutting down, don't wait around.
+        LOG.trace("Should not wait for cond because we're shutting down.");
+        return false;
+      }
+      int contentLength = bufferManager[activeBuf].contentLength();
+      if (contentLength == 0) {
+        // If there is nothing in the buffer, there is nothing to do.
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Should wait for cond because we have no data buffered " +
+              "in bufferManager " + activeBuf);
+        }
+        lastBufferClearedTimeMs = TimeUtil.nowMs();
+        return true;
+      } else if (contentLength >= conf.triggerSize) {
+        // If the active buffer is filling up, start flushing.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Should not wait for cond because we have more than " +
+              conf.triggerSize + " bytes buffered in bufferManager " +
+              activeBuf);
+        }
+        return false;
+      }
+      if (timeSinceLastClearedMs > conf.maxFlushIntervalMs) {
+        // If we have let the spans sit in the buffer for too long,
+        // start flushing.
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Should not wait for cond because it has been " +
+              timeSinceLastClearedMs + " ms since our last flush, and we " +
+              "are overdue for another because maxFlushIntervalMs is " +
+              conf.maxFlushIntervalMs);
+        }
+        return false;
+      }
+      LOG.trace("Should wait for cond.");
+      return true;
+    }
+
+    @Override
+    public void run() {
+      try {
+        faultInjector.handleThreadStart();
+        LOG.debug("Starting HTracedSpanReceiver thread for " +
+            conf.endpointStr);
+        BufferManager flushBufManager = null;
+        while (true) {
+          lock.lock();
+          flushingBuf = -1;
+          try {
+            while (true) {
+              long timeSinceLastClearedMs = TimeUtil.
+                deltaMs(lastBufferClearedTimeMs, TimeUtil.nowMs());
+              if (!shouldWaitForCond(timeSinceLastClearedMs)) {
+                break;
+              }
+              long waitMs = conf.maxFlushIntervalMs -
+                  Math.min(conf.maxFlushIntervalMs, TimeUtil.
+                      deltaMs(TimeUtil.nowMs(), lastBufferClearedTimeMs));
+              if (LOG.isTraceEnabled()) {
+                LOG.trace("Waiting on wakePostSpansThread for " + waitMs +
+                    " ms.");
+              }
+              try {
+                wakePostSpansThread.await(waitMs, TimeUnit.MILLISECONDS);
+              } catch (InterruptedException e) {
+                LOG.info("HTraceSpanReceiver thread was interrupted.", e);
+                throw e;
+              }
+            }
+            if (shutdown && (bufferManager[activeBuf].contentLength() == 0)) {
+              LOG.debug("PostSpansThread shutting down.");
+              return;
+            }
+            flushingBuf = activeBuf;
+            flushBufManager = bufferManager[flushingBuf];
+            activeBuf = (activeBuf == 1) ? 0 : 1;
+          } finally {
+            lock.unlock();
+          }
+          doFlush(flushBufManager);
+          flushBufManager.clear();
+          lastBufferClearedTimeMs = TimeUtil.nowMs();
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("setting lastBufferClearedTimeMs to " + 
lastBufferClearedTimeMs);
+          }
+        }
+      } catch (Throwable e) {
+        LOG.error("PostSpansThread exiting on unexpected exception", e);
+      } finally {
+        for (int i = 0; i < bufferManager.length; i++) {
+          bufferManager[i].close();
+        }
+      }
+    }
+
+    private void doFlush(BufferManager flushBufManager)
+        throws InterruptedException {
+      try {
+        flushBufManager.prepare();
+      } catch (IOException e) {
+        LOG.error("Failed to prepare buffer containing " +
+            flushBufManager.getNumberOfSpans() + " spans for " +
+            "sending to " + conf.endpointStr + " Discarding " +
+            "all spans.", e);
+        return;
+      }
+      int flushTries = 0;
+      while (true) {
+        Throwable exc;
+        try {
+          faultInjector.handleFlush();
+          flushBufManager.flush();
+          exc = null;
+        } catch (RuntimeException e) {
+          exc = e;
+        } catch (Exception e) {
+          exc = e;
+        }
+        if (exc == null) {
+          return;
+        }
+        int numSpans = flushBufManager.getNumberOfSpans();
+        String excMessage = "Failed to flush " + numSpans  + " htrace " +
+            "spans to " + conf.endpointStr + " on try " + (flushTries + 1);
+        if (flushTries >= conf.flushRetryDelays.length) {
+          excMessage += ".  Discarding all spans.";
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.error(excMessage, exc);
+        } else {
+          flushErrorLog.error(excMessage, exc);
+        }
+        if (flushTries >= conf.flushRetryDelays.length) {
+          return;
+        }
+        int delayMs = conf.flushRetryDelays[flushTries];
+        Thread.sleep(delayMs);
+        flushTries++;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java
----------------------------------------------------------------------
diff --git 
a/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java 
b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java
new file mode 100644
index 0000000..f867ad7
--- /dev/null
+++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.core.MilliSpan;
+import org.apache.htrace.core.TimelineAnnotation;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessagePacker;
+import org.msgpack.core.MessageUnpacker;
+import org.msgpack.core.buffer.MessageBuffer;
+import org.msgpack.core.buffer.MessageBufferOutput;
+
+import org.apache.htrace.core.Span;
+import org.apache.htrace.core.SpanId;
+
+/**
+ * A ByteBuffer which we are writing msgpack data to.
+ */
+class PackedBuffer {
+  /**
+   * A MessageBufferOutput that simply outputs to a ByteBuffer.
+   */
+  private class PackedBufferOutput implements MessageBufferOutput {
+    private MessageBuffer savedBuffer;
+
+    PackedBufferOutput() {
+    }
+
+    @Override
+    public MessageBuffer next(int bufferSize) throws IOException {
+      if (savedBuffer == null || savedBuffer.size() != bufferSize) {
+        savedBuffer = MessageBuffer.newBuffer(bufferSize);
+      }
+      MessageBuffer buffer = savedBuffer;
+      savedBuffer = null;
+      return buffer;
+    }
+
+    @Override
+    public void flush(MessageBuffer buffer) throws IOException {
+      ByteBuffer b = buffer.toByteBuffer();
+      bb.put(b);
+      savedBuffer = buffer;
+    }
+
+    @Override
+    public void close() throws IOException {
+      // no-op
+    }
+  }
+
+  private static final Log LOG = LogFactory.getLog(PackedBuffer.class);
+  private static final Charset UTF8 = StandardCharsets.UTF_8;
+  private static final byte SPANS[] = "Spans".getBytes(UTF8);
+  private static final byte DEFAULT_PID[] = "DefaultPid".getBytes(UTF8);
+  private static final byte A[] = "a".getBytes(UTF8);
+  private static final byte B[] = "b".getBytes(UTF8);
+  private static final byte E[] = "e".getBytes(UTF8);
+  private static final byte D[] = "d".getBytes(UTF8);
+  private static final byte R[] = "r".getBytes(UTF8);
+  private static final byte P[] = "p".getBytes(UTF8);
+  private static final byte N[] = "n".getBytes(UTF8);
+  private static final byte T[] = "t".getBytes(UTF8);
+  private static final byte M[] = "m".getBytes(UTF8);
+  private static final int HRPC_MAGIC = 0x43525448;
+  static final int HRPC_REQ_FRAME_LENGTH = 20;
+  static final int HRPC_RESP_FRAME_LENGTH = 20;
+  static final int MAX_HRPC_ERROR_LENGTH = 4 * 1024 * 1024;
+  static final int MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024;
+  private static final int SPAN_ID_BYTE_LENGTH = 16;
+  static final MessagePack.Config MSGPACK_CONF =
+      new MessagePack.ConfigBuilder()
+        .readBinaryAsString(false)
+        .readStringAsBinary(false)
+        .build();
+  /**
+   * The array which we are filling.
+   */
+  final ByteBuffer bb;
+
+  /**
+   * Used to tell the MessagePacker to output to our array.
+   */
+  final PackedBufferOutput out;
+
+  /**
+   * A temporary buffer for serializing span ids and other things.
+   */
+  final byte[] temp;
+
+  /**
+   * Generates msgpack output.
+   */
+  final MessagePacker packer;
+
+  /**
+   * Create a new PackedBuffer.
+   *
+   * @param bb        The ByteBuffer to use to create the packed buffer.
+   */
+  PackedBuffer(ByteBuffer bb) {
+    this.bb = bb;
+    this.out = new PackedBufferOutput();
+    this.temp = new byte[SPAN_ID_BYTE_LENGTH];
+    this.packer = new MessagePacker(out, MSGPACK_CONF);
+  }
+
+  /**
+   * Write the fixed-length request frame which starts packed RPC messages.
+   */
+  static void writeReqFrame(ByteBuffer bb, int methodId, long seq, int length)
+      throws IOException {
+    int oldPos = bb.position();
+    boolean success = false;
+    try {
+      bb.order(ByteOrder.LITTLE_ENDIAN);
+      bb.putInt(HRPC_MAGIC);
+      bb.putInt(methodId);
+      bb.putLong(seq);
+      bb.putInt(length);
+      success = true;
+    } finally {
+      if (!success) {
+        bb.position(oldPos);
+      }
+    }
+  }
+
+  /**
+   * Write an 8-byte value to a byte array as little-endian.
+   */
+  private static void longToBigEndian(byte b[], int pos, long val) {
+    b[pos + 0] =(byte) ((val >> 56) & 0xff);
+    b[pos + 1] =(byte) ((val >> 48) & 0xff);
+    b[pos + 2] =(byte) ((val >> 40) & 0xff);
+    b[pos + 3] =(byte) ((val >> 32) & 0xff);
+    b[pos + 4] =(byte) ((val >> 24) & 0xff);
+    b[pos + 5] =(byte) ((val >> 16) & 0xff);
+    b[pos + 6] =(byte) ((val >>  8) & 0xff);
+    b[pos + 7] =(byte) ((val >>  0) & 0xff);
+  }
+
+  private void writeSpanId(SpanId spanId) throws IOException {
+    longToBigEndian(temp, 0, spanId.getHigh());
+    longToBigEndian(temp, 8, spanId.getLow());
+    packer.packBinaryHeader(SPAN_ID_BYTE_LENGTH);
+    packer.writePayload(temp, 0, SPAN_ID_BYTE_LENGTH);
+  }
+
+  /**
+   * Serialize a span to the given OutputStream.
+   */
+  void writeSpan(Span span) throws IOException {
+    boolean success = false;
+    int oldPos = bb.position();
+    try {
+      int mapSize = 0;
+      if (span.getSpanId().isValid()) {
+        mapSize++;
+      }
+      if (span.getStartTimeMillis() != 0) {
+        mapSize++;
+      }
+      if (span.getStopTimeMillis() != 0) {
+        mapSize++;
+      }
+      if (!span.getDescription().isEmpty()) {
+        mapSize++;
+      }
+      if (!span.getTracerId().isEmpty()) {
+        mapSize++;
+      }
+      if (span.getParents().length > 0) {
+        mapSize++;
+      }
+      if (!span.getKVAnnotations().isEmpty()) {
+        mapSize++;
+      }
+      if (!span.getTimelineAnnotations().isEmpty()) {
+        mapSize++;
+      }
+      packer.packMapHeader(mapSize);
+      if (span.getSpanId().isValid()) {
+        packer.packRawStringHeader(1);
+        packer.writePayload(A);
+        writeSpanId(span.getSpanId());
+      }
+      if (span.getStartTimeMillis() != 0) {
+        packer.packRawStringHeader(1);
+        packer.writePayload(B);
+        packer.packLong(span.getStartTimeMillis());
+      }
+      if (span.getStopTimeMillis() != 0) {
+        packer.packRawStringHeader(1);
+        packer.writePayload(E);
+        packer.packLong(span.getStopTimeMillis());
+      }
+      if (!span.getDescription().isEmpty()) {
+        packer.packRawStringHeader(1);
+        packer.writePayload(D);
+        packer.packString(span.getDescription());
+      }
+      if (!span.getTracerId().isEmpty()) {
+        packer.packRawStringHeader(1);
+        packer.writePayload(R);
+        packer.packString(span.getTracerId());
+      }
+      if (span.getParents().length > 0) {
+        packer.packRawStringHeader(1);
+        packer.writePayload(P);
+        packer.packArrayHeader(span.getParents().length);
+        for (int i = 0; i < span.getParents().length; i++) {
+          writeSpanId(span.getParents()[i]);
+        }
+      }
+      if (!span.getKVAnnotations().isEmpty()) {
+        packer.packRawStringHeader(1);
+        packer.writePayload(N);
+        Map<String, String> map = span.getKVAnnotations();
+        packer.packMapHeader(map.size());
+        for (Map.Entry<String, String> entry : map.entrySet()) {
+          packer.packString(entry.getKey());
+          packer.packString(entry.getValue());
+        }
+      }
+      if (!span.getTimelineAnnotations().isEmpty()) {
+        packer.packRawStringHeader(1);
+        packer.writePayload(T);
+        List<TimelineAnnotation> list = span.getTimelineAnnotations();
+        packer.packArrayHeader(list.size());
+        for (TimelineAnnotation annotation : list) {
+          packer.packMapHeader(2);
+          packer.packRawStringHeader(1);
+          packer.writePayload(T);
+          packer.packLong(annotation.getTime());
+          packer.packRawStringHeader(1);
+          packer.writePayload(M);
+          packer.packString(annotation.getMessage());
+        }
+      }
+      packer.flush();
+      success = true;
+    } finally {
+      if (!success) {
+        // If we failed earlier, restore the old position.
+        // This is so that if we run out of space, we don't add a
+        // partial span to the buffer.
+        bb.position(oldPos);
+      }
+    }
+  }
+
+  static SpanId readSpanId(MessageUnpacker unpacker) throws IOException {
+    int alen = unpacker.unpackBinaryHeader();
+    if (alen != SPAN_ID_BYTE_LENGTH) {
+      throw new IOException("Invalid length given for spanID array.  " +
+          "Expected " + SPAN_ID_BYTE_LENGTH + "; got " + alen);
+    }
+    byte[] payload = new byte[SPAN_ID_BYTE_LENGTH];
+    unpacker.readPayload(payload);
+    return new SpanId(
+        ((payload[ 7] & 0xffL) <<  0) |
+        ((payload[ 6] & 0xffL) <<  8) |
+        ((payload[ 5] & 0xffL) << 16) |
+        ((payload[ 4] & 0xffL) << 24) |
+        ((payload[ 3] & 0xffL) << 32) |
+        ((payload[ 2] & 0xffL) << 40) |
+        ((payload[ 1] & 0xffL) << 48) |
+        ((payload[ 0] & 0xffL) << 56),
+        ((payload[15] & 0xffL) <<  0) |
+        ((payload[14] & 0xffL) <<  8) |
+        ((payload[13] & 0xffL) << 16) |
+        ((payload[12] & 0xffL) << 24) |
+        ((payload[11] & 0xffL) << 32) |
+        ((payload[10] & 0xffL) << 40) |
+        ((payload[ 9] & 0xffL) << 48) |
+        ((payload[ 8] & 0xffL) << 56)
+      );
+  }
+
+  /**
+   * Read a span.  Used in unit tests.  Not optimized.
+   */
+  static Span readSpan(MessageUnpacker unpacker) throws IOException {
+    int numEntries = unpacker.unpackMapHeader();
+    MilliSpan.Builder builder = new MilliSpan.Builder();
+    while (--numEntries >= 0) {
+      String key = unpacker.unpackString();
+      if (key.length() != 1) {
+        throw new IOException("Unknown key " + key);
+      }
+      switch (key.charAt(0)) {
+        case 'a':
+          builder.spanId(readSpanId(unpacker));
+          break;
+        case 'b':
+          builder.begin(unpacker.unpackLong());
+          break;
+        case 'e':
+          builder.end(unpacker.unpackLong());
+          break;
+        case 'd':
+          builder.description(unpacker.unpackString());
+          break;
+        case 'r':
+          builder.tracerId(unpacker.unpackString());
+          break;
+        case 'p':
+          int numParents = unpacker.unpackArrayHeader();
+          SpanId[] parents = new SpanId[numParents];
+          for (int i = 0; i < numParents; i++) {
+            parents[i] = readSpanId(unpacker);
+          }
+          builder.parents(parents);
+          break;
+        case 'n':
+          int mapEntries = unpacker.unpackMapHeader();
+          HashMap<String, String> entries =
+              new HashMap<String, String>(mapEntries);
+          for (int i = 0; i < mapEntries; i++) {
+            String k = unpacker.unpackString();
+            String v = unpacker.unpackString();
+            entries.put(k, v);
+          }
+          builder.traceInfo(entries);
+          break;
+        case 't':
+          int listEntries = unpacker.unpackArrayHeader();
+          ArrayList<TimelineAnnotation> list =
+              new ArrayList<TimelineAnnotation>(listEntries);
+          for (int i = 0; i < listEntries; i++) {
+            int timelineObjectSize = unpacker.unpackMapHeader();
+            long time = 0;
+            String msg = "";
+            for (int j = 0; j < timelineObjectSize; j++) {
+              String tlKey = unpacker.unpackString();
+              if (tlKey.length() != 1) {
+                throw new IOException("Unknown timeline map key " + tlKey);
+              }
+              switch (tlKey.charAt(0)) {
+                case 't':
+                  time = unpacker.unpackLong();
+                  break;
+                case 'm':
+                  msg = unpacker.unpackString();
+                  break;
+                default:
+                  throw new IOException("Unknown timeline map key " + tlKey);
+              }
+            }
+            list.add(new TimelineAnnotation(time, msg));
+          }
+          builder.timeline(list);
+          break;
+        default:
+          throw new IOException("Unknown key " + key);
+      }
+    }
+    return builder.build();
+  }
+
+  void beginWriteSpansRequest(String defaultPid, int numSpans)
+      throws IOException {
+    boolean success = false;
+    int oldPos = bb.position();
+    try {
+      int mapSize = 1;
+      if (defaultPid != null) {
+        mapSize++;
+      }
+      packer.packMapHeader(mapSize);
+      if (defaultPid != null) {
+        packer.packRawStringHeader(DEFAULT_PID.length);
+        packer.writePayload(DEFAULT_PID);
+        packer.packString(defaultPid);
+      }
+      packer.packRawStringHeader(SPANS.length);
+      packer.writePayload(SPANS);
+      packer.packArrayHeader(numSpans);
+      packer.flush();
+      success = true;
+    } finally {
+      if (!success) {
+        bb.position(oldPos);
+      }
+    }
+  }
+
+  /**
+   * Get the underlying ByteBuffer.
+   */
+  ByteBuffer getBuffer() {
+    return bb;
+  }
+
+  /**
+   * Reset our position in the array.
+   */
+  void reset() throws IOException {
+    packer.reset(out);
+  }
+
+  void close() {
+    try {
+      packer.close();
+    } catch (IOException e) {
+      LOG.error("Error closing MessagePacker", e);
+    }
+  }
+
+  public String toHexString() {
+    String prefix = "";
+    StringBuilder bld = new StringBuilder();
+    ByteBuffer b = bb.duplicate();
+    b.flip();
+    while (b.hasRemaining()) {
+      bld.append(String.format("%s%02x", prefix, b.get()));
+      prefix = " ";
+    }
+    return bld.toString();
+  }
+}

Reply via email to