HTRACE-237. Optimize htraced span receiver (cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/cb2351d2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/cb2351d2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/cb2351d2 Branch: refs/heads/master Commit: cb2351d292663582c84885ce899f2777b81ecd30 Parents: 43ce213 Author: Colin P. Mccabe <[email protected]> Authored: Mon Oct 12 13:09:41 2015 -0700 Committer: Colin P. Mccabe <[email protected]> Committed: Mon Oct 12 13:13:43 2015 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/htrace/core/Span.java | 8 +- .../java/org/apache/htrace/util/TestUtil.java | 75 +++ .../go/src/org/apache/htrace/client/client.go | 42 +- .../go/src/org/apache/htrace/common/rest.go | 45 -- .../go/src/org/apache/htrace/common/rpc.go | 27 +- .../src/org/apache/htrace/common/span_test.go | 2 +- .../go/src/org/apache/htrace/conf/config.go | 4 +- .../go/src/org/apache/htrace/htrace/cmd.go | 6 +- .../src/org/apache/htrace/htraced/datastore.go | 14 +- .../go/src/org/apache/htrace/htraced/rest.go | 41 +- htrace-htraced/pom.xml | 15 +- .../org/apache/htrace/impl/BufferManager.java | 74 +++ .../main/java/org/apache/htrace/impl/Conf.java | 356 ++++++++++++ .../apache/htrace/impl/HTracedRESTReceiver.java | 431 -------------- .../apache/htrace/impl/HTracedSpanReceiver.java | 333 +++++++++++ .../org/apache/htrace/impl/PackedBuffer.java | 449 +++++++++++++++ .../apache/htrace/impl/PackedBufferManager.java | 340 +++++++++++ .../apache/htrace/impl/RateLimitedLogger.java | 72 +++ .../apache/htrace/impl/RestBufferManager.java | 225 ++++++++ .../java/org/apache/htrace/impl/TimeUtil.java | 78 +++ .../java/org/apache/htrace/impl/DataDir.java | 58 ++ .../org/apache/htrace/impl/HTracedProcess.java | 277 +++++++++ .../htrace/impl/TestHTracedRESTReceiver.java | 215 ------- .../apache/htrace/impl/TestHTracedReceiver.java | 572 +++++++++++++++++++ .../htrace/impl/TestHTracedReceiverConf.java | 91 +++ .../apache/htrace/impl/TestPackedBuffer.java | 76 +++ .../org/apache/htrace/impl/TestTimeUtil.java | 46 ++ .../java/org/apache/htrace/util/DataDir.java | 97 ---- .../org/apache/htrace/util/HTracedProcess.java | 172 ------ .../apache/htrace/util/TestHTracedProcess.java | 100 ---- 30 files changed, 3199 insertions(+), 1142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-core4/src/main/java/org/apache/htrace/core/Span.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/Span.java b/htrace-core4/src/main/java/org/apache/htrace/core/Span.java index e63d414..33908db 100644 --- a/htrace-core4/src/main/java/org/apache/htrace/core/Span.java +++ b/htrace-core4/src/main/java/org/apache/htrace/core/Span.java @@ -17,6 +17,7 @@ package org.apache.htrace.core; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -170,8 +171,11 @@ public interface Span { Map<String, String> traceInfoMap = span.getKVAnnotations(); if (!traceInfoMap.isEmpty()) { jgen.writeObjectFieldStart("n"); - for (Map.Entry<String, String> e : traceInfoMap.entrySet()) { - jgen.writeStringField(e.getKey(), e.getValue()); + String[] keys = traceInfoMap.keySet(). + toArray(new String[traceInfoMap.size()]); + Arrays.sort(keys); + for (String key : keys) { + jgen.writeStringField(key, traceInfoMap.get(key)); } jgen.writeEndObject(); } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-core4/src/test/java/org/apache/htrace/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/test/java/org/apache/htrace/util/TestUtil.java b/htrace-core4/src/test/java/org/apache/htrace/util/TestUtil.java index 7cb4aed..0869ca0 100644 --- a/htrace-core4/src/test/java/org/apache/htrace/util/TestUtil.java +++ b/htrace-core4/src/test/java/org/apache/htrace/util/TestUtil.java @@ -16,8 +16,18 @@ */ package org.apache.htrace.util; +import org.apache.htrace.core.MilliSpan; +import org.apache.htrace.core.Span; +import org.apache.htrace.core.SpanId; +import org.apache.htrace.core.TimelineAnnotation; + import java.io.File; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.Random; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -88,4 +98,69 @@ public class TestUtil { Thread.sleep(periodMs); } } + + private static long nonZeroRandomLong(Random rand) { + long r = 0; + do { + r = rand.nextLong(); + } while (r == 0); + return r; + } + + private static long positiveRandomLong(Random rand) { + long r = rand.nextLong(); + if (r == Long.MIN_VALUE) { + // Math.abs can't handle this case + return Long.MAX_VALUE; + } else if (r > 0) { + return r; + } else { + return -r; + } + } + + private static String randomString(Random rand) { + return new UUID(positiveRandomLong(rand), + positiveRandomLong(rand)).toString(); + } + + public static Span randomSpan(Random rand) { + MilliSpan.Builder builder = new MilliSpan.Builder(); + builder.spanId( + new SpanId(nonZeroRandomLong(rand), nonZeroRandomLong(rand))); + builder.begin(positiveRandomLong(rand)); + builder.end(positiveRandomLong(rand)); + builder.description(randomString(rand)); + builder.tracerId(randomString(rand)); + int numParents = rand.nextInt(4); + SpanId[] parents = new SpanId[numParents]; + for (int i = 0; i < numParents; i++) { + parents[i] = + new SpanId(nonZeroRandomLong(rand), nonZeroRandomLong(rand)); + } + builder.parents(parents); + int numTraceInfos = rand.nextInt(4); + Map<String, String> traceInfo = new HashMap<String, String>(numTraceInfos); + for (int i = 0; i < numTraceInfos; i++) { + traceInfo.put(randomString(rand), randomString(rand)); + } + builder.traceInfo(traceInfo); + int numTimelineAnnotations = rand.nextInt(4); + List<TimelineAnnotation> timeline = + new LinkedList<TimelineAnnotation>(); + for (int i = 0; i < numTimelineAnnotations; i++) { + timeline.add(new TimelineAnnotation(positiveRandomLong(rand), + randomString(rand))); + } + builder.timeline(timeline); + return builder.build(); + } + + public static Span[] randomSpans(Random rand, int numSpans) { + Span[] spans = new Span[numSpans]; + for (int i = 0; i < spans.length; i++) { + spans[i] = randomSpan(rand); + } + return spans; + } } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/go/src/org/apache/htrace/client/client.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/client/client.go b/htrace-htraced/go/src/org/apache/htrace/client/client.go index 2ac8a1e..dd3f8a3 100644 --- a/htrace-htraced/go/src/org/apache/htrace/client/client.go +++ b/htrace-htraced/go/src/org/apache/htrace/client/client.go @@ -114,32 +114,13 @@ func (hcl *Client) WriteSpans(req *common.WriteSpansReq) error { func (hcl *Client) writeSpansHttp(req *common.WriteSpansReq) error { var w bytes.Buffer - var err error - for i := range req.Spans { - var buf []byte - buf, err = json.Marshal(req.Spans[i]) - if err != nil { - return errors.New(fmt.Sprintf("Error serializing span: %s", - err.Error())) - } - _, err = w.Write(buf) - if err != nil { - return errors.New(fmt.Sprintf("Error writing span: %s", - err.Error())) - } - _, err = w.Write([]byte{'\n'}) - //err = io.WriteString(&w, "\n") - if err != nil { - return errors.New(fmt.Sprintf("Error writing: %s", - err.Error())) - } - } - customHeaders := make(map[string]string) - if req.DefaultTrid != "" { - customHeaders["htrace-trid"] = req.DefaultTrid + enc := json.NewEncoder(&w) + err := enc.Encode(req) + if err != nil { + return errors.New(fmt.Sprintf("Error serializing span: %s", + err.Error())) } - _, _, err = hcl.makeRestRequest("POST", "writeSpans", - &w, customHeaders) + _, _, err = hcl.makeRestRequest("POST", "writeSpans", &w) if err != nil { return err } @@ -182,24 +163,19 @@ func (hcl *Client) Query(query *common.Query) ([]common.Span, error) { return spans, nil } -var EMPTY = make(map[string]string) - func (hcl *Client) makeGetRequest(reqName string) ([]byte, int, error) { - return hcl.makeRestRequest("GET", reqName, nil, EMPTY) + return hcl.makeRestRequest("GET", reqName, nil) } // Make a general JSON REST request. // Returns the request body, the response code, and the error. // Note: if the response code is non-zero, the error will also be non-zero. -func (hcl *Client) makeRestRequest(reqType string, reqName string, reqBody io.Reader, - customHeaders map[string]string) ([]byte, int, error) { +func (hcl *Client) makeRestRequest(reqType string, reqName string, + reqBody io.Reader) ([]byte, int, error) { url := fmt.Sprintf("http://%s/%s", hcl.restAddr, reqName) req, err := http.NewRequest(reqType, url, reqBody) req.Header.Set("Content-Type", "application/json") - for k, v := range customHeaders { - req.Header.Set(k, v) - } client := &http.Client{} resp, err := client.Do(req) if err != nil { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/go/src/org/apache/htrace/common/rest.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rest.go b/htrace-htraced/go/src/org/apache/htrace/common/rest.go deleted file mode 100644 index b367ed1..0000000 --- a/htrace-htraced/go/src/org/apache/htrace/common/rest.go +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package common - -// Info returned by /server/info -type ServerInfo struct { - // The server release version. - ReleaseVersion string - - // The git hash that this software was built with. - GitVersion string -} - -// Info returned by /server/stats -type ServerStats struct { - Shards []ShardStats -} - -type ShardStats struct { - Path string - - // The approximate number of spans present in this shard. This may be an - // underestimate. - ApproxNumSpans uint64 - - // leveldb.stats information - LevelDbStats string -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/go/src/org/apache/htrace/common/rpc.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go index 28521a5..9c7bfad 100644 --- a/htrace-htraced/go/src/org/apache/htrace/common/rpc.go +++ b/htrace-htraced/go/src/org/apache/htrace/common/rpc.go @@ -38,14 +38,39 @@ const MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024 // A request to write spans to htraced. type WriteSpansReq struct { - DefaultTrid string + DefaultTrid string `json:",omitempty"` Spans []*Span } +// Info returned by /server/info +type ServerInfo struct { + // The server release version. + ReleaseVersion string + + // The git hash that this software was built with. + GitVersion string +} + // A response to a WriteSpansReq type WriteSpansResp struct { } +// Info returned by /server/stats +type ServerStats struct { + Shards []ShardStats +} + +type ShardStats struct { + Path string + + // The approximate number of spans present in this shard. This may be an + // underestimate. + ApproxNumSpans uint64 + + // leveldb.stats information + LevelDbStats string +} + // The header which is sent over the wire for HRPC type HrpcRequestHeader struct { Magic uint32 http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/go/src/org/apache/htrace/common/span_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/common/span_test.go b/htrace-htraced/go/src/org/apache/htrace/common/span_test.go index 9de7cee..7fb128d 100644 --- a/htrace-htraced/go/src/org/apache/htrace/common/span_test.go +++ b/htrace-htraced/go/src/org/apache/htrace/common/span_test.go @@ -92,7 +92,7 @@ func TestSpanMsgPack(t *testing.T) { End: 5678, Description: "getFileDescriptors", Parents: []SpanId{}, - TracerId: "testTracerId", + TracerId: "testTracerId", }} mh := new(codec.MsgpackHandle) mh.WriteExt = true http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/go/src/org/apache/htrace/conf/config.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/conf/config.go b/htrace-htraced/go/src/org/apache/htrace/conf/config.go index b8f6c84..76af7a6 100644 --- a/htrace-htraced/go/src/org/apache/htrace/conf/config.go +++ b/htrace-htraced/go/src/org/apache/htrace/conf/config.go @@ -69,7 +69,7 @@ type Builder struct { } func getDefaultHTracedConfDir() string { - return PATH_SEP + "etc" + PATH_SEP + "htraced" + PATH_SEP + "conf"; + return PATH_SEP + "etc" + PATH_SEP + "htraced" + PATH_SEP + "conf" } func getHTracedConfDirs(dlog io.Writer) []string { @@ -78,7 +78,7 @@ func getHTracedConfDirs(dlog io.Writer) []string { if len(paths) < 1 { def := getDefaultHTracedConfDir() io.WriteString(dlog, fmt.Sprintf("HTRACED_CONF_DIR defaulting to %s\n", def)) - return []string{ def } + return []string{def} } io.WriteString(dlog, fmt.Sprintf("HTRACED_CONF_DIR=%s\n", confDir)) return paths http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go index 8bc0c64..b5549c5 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go +++ b/htrace-htraced/go/src/org/apache/htrace/htrace/cmd.go @@ -31,8 +31,8 @@ import ( "org/apache/htrace/common" "org/apache/htrace/conf" "os" - "time" "strings" + "time" ) var RELEASE_VERSION string @@ -126,7 +126,7 @@ func main() { case serverInfo.FullCommand(): os.Exit(printServerInfo(hcl)) case serverStats.FullCommand(): - if (*serverStatsJson) { + if *serverStatsJson { os.Exit(printServerStatsJson(hcl)) } else { os.Exit(printServerStats(hcl)) @@ -195,7 +195,7 @@ func printServerStats(hcl *htrace.Client) int { } fmt.Printf("HTraced server stats:\n") fmt.Printf("%d leveldb shards.\n", len(stats.Shards)) - for i := range(stats.Shards) { + for i := range stats.Shards { shard := stats.Shards[i] fmt.Printf("==== %s ===\n", shard.Path) fmt.Printf("Approximate number of spans: %d\n", shard.ApproxNumSpans) http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go index 0595d36..9fb9920 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/datastore.go @@ -953,19 +953,19 @@ func (store *dataStore) HandleQuery(query *common.Query) ([]*common.Span, error) } func (store *dataStore) ServerStats() *common.ServerStats { - serverStats := common.ServerStats { - Shards : make([]common.ShardStats, len(store.shards)), + serverStats := common.ServerStats{ + Shards: make([]common.ShardStats, len(store.shards)), } - for shardIdx := range(store.shards) { + for shardIdx := range store.shards { shard := store.shards[shardIdx] serverStats.Shards[shardIdx].Path = shard.path - r := levigo.Range { - Start : append([]byte{SPAN_ID_INDEX_PREFIX}, + r := levigo.Range{ + Start: append([]byte{SPAN_ID_INDEX_PREFIX}, common.INVALID_SPAN_ID.Val()...), - Limit : append([]byte{SPAN_ID_INDEX_PREFIX + 1}, + Limit: append([]byte{SPAN_ID_INDEX_PREFIX + 1}, common.INVALID_SPAN_ID.Val()...), } - vals := shard.ldb.GetApproximateSizes([]levigo.Range { r }) + vals := shard.ldb.GetApproximateSizes([]levigo.Range{r}) serverStats.Shards[shardIdx].ApproxNumSpans = vals[0] serverStats.Shards[shardIdx].LevelDbStats = shard.ldb.PropertyValue("leveldb.stats") http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go index 97b2bca..cbfc508 100644 --- a/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go +++ b/htrace-htraced/go/src/org/apache/htrace/htraced/rest.go @@ -193,37 +193,24 @@ func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques } else { dec = json.NewDecoder(req.Body) } - spans := make([]*common.Span, 0, 32) - defaultTrid := req.Header.Get("htrace-trid") - for { - var span common.Span - err := dec.Decode(&span) - if err != nil { - if err != io.EOF { - writeError(hand.lg, w, http.StatusBadRequest, - fmt.Sprintf("Error parsing spans: %s", err.Error())) - return - } - break - } - spanIdProblem := span.Id.FindProblem() - if spanIdProblem != "" { - writeError(hand.lg, w, http.StatusBadRequest, - fmt.Sprintf("Invalid span ID: %s", spanIdProblem)) - return - } - if span.TracerId == "" { - span.TracerId = defaultTrid - } - spans = append(spans, &span) + var msg common.WriteSpansReq + err := dec.Decode(&msg) + if (err != nil) && (err != io.EOF) { + writeError(hand.lg, w, http.StatusBadRequest, + fmt.Sprintf("Error parsing WriteSpansReq: %s", err.Error())) + return } hand.lg.Debugf("writeSpansHandler: received %d span(s). defaultTrid = %s\n", - len(spans), defaultTrid) - for spanIdx := range spans { + len(msg.Spans), msg.DefaultTrid) + for spanIdx := range msg.Spans { if hand.lg.DebugEnabled() { - hand.lg.Debugf("writing span %s\n", spans[spanIdx].ToJson()) + hand.lg.Debugf("writing span %s\n", msg.Spans[spanIdx].ToJson()) + } + span := msg.Spans[spanIdx] + if span.TracerId == "" { + span.TracerId = msg.DefaultTrid } - hand.store.WriteSpan(spans[spanIdx]) + hand.store.WriteSpan(span) } } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/pom.xml ---------------------------------------------------------------------- diff --git a/htrace-htraced/pom.xml b/htrace-htraced/pom.xml index 88fd2fc..4478dc5 100644 --- a/htrace-htraced/pom.xml +++ b/htrace-htraced/pom.xml @@ -86,6 +86,10 @@ language governing permissions and limitations under the License. --> <pattern>org.eclipse.jetty</pattern> <shadedPattern>org.apache.htrace.shaded.jetty</shadedPattern> </relocation> + <dependency> + <pattern>org.msgpack</pattern> + <shadedPattern>org.apache.htrace.msgpack</shadedPattern> + </dependency> </relocations> </configuration> <goals> @@ -200,12 +204,11 @@ language governing permissions and limitations under the License. --> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> - <!-- htraced rest client deps. --> - <!--Is this too much? Pulls down jetty-http, jetty-server, jetty-io - This is new-style jetty client, jetty9 and jdk7 required. - It can do async but we will use it synchronously at first. - Has nice tutorial: http://www.eclipse.org/jetty/documentation/9.1.5.v20140505/http-client-api.html - --> + <dependency> + <groupId>org.msgpack</groupId> + <artifactId>msgpack-core</artifactId> + <version>0.7.0-p9</version> + </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-client</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/main/java/org/apache/htrace/impl/BufferManager.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/BufferManager.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/BufferManager.java new file mode 100644 index 0000000..29816eb --- /dev/null +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/BufferManager.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import java.io.IOException; + +import org.apache.htrace.core.Span; + +/** + * A buffer which contains span data and is able to send it over the network. + * + * BufferManager functions are not thread-safe. You must rely on external + * synchronization to protect buffers from concurrent operations. + */ +interface BufferManager { + /** + * Write a span to this buffer. + * + * @param span The span to write. + * + * @throws IOException If the buffer doesn't have enough space to hold the + * new span. We will not write a partial span to the + * buffer in this case. + */ + void writeSpan(Span span) throws IOException; + + /** + * Get the amount of content currently in the buffer. + */ + int contentLength(); + + /** + * Get the number of spans currently buffered. + */ + int getNumberOfSpans(); + + /** + * Prepare the buffers to be flushed. + */ + void prepare() throws IOException; + + /** + * Flush this buffer to htraced. + * + * This is a blocking operation which will not return until the buffer is + * completely flushed. + */ + void flush() throws IOException; + + /** + * Clear the data in this buffer. + */ + void clear(); + + /** + * Closes the buffer manager and frees all resources. + */ + void close(); +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java new file mode 100644 index 0000000..cdd176f --- /dev/null +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/Conf.java @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import org.apache.htrace.core.HTraceConfiguration; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * The configuration of the HTracedSpanReceiver. + * + * This class extracts all the relevant configuration information for + * HTracedSpanReceiver from the HTraceConfiguration. It performs parsing and + * bounds-checking for the configuration keys. + * + * It is more efficient to store the configuration as final values in this + * structure than to access the HTraceConfiguration object directly. This is + * especially true when the HTraceConfiguration object is a thin shim around a + * Hadoop Configuration object, which requires synchronization to access. + */ +class Conf { + private static final Log LOG = LogFactory.getLog(Conf.class); + + /** + * Address of the htraced server. + */ + final static String ADDRESS_KEY = + "htraced.receiver.address"; + + /** + * The minimum number of milliseconds to wait for a read or write + * operation on the network. + */ + final static String IO_TIMEOUT_MS_KEY = + "htraced.receiver.io.timeout.ms"; + final static int IO_TIMEOUT_MS_DEFAULT = 60000; + final static int IO_TIMEOUT_MS_MIN = 50; + + /** + * The minimum number of milliseconds to wait for a network + * connection attempt. + */ + final static String CONNECT_TIMEOUT_MS_KEY = + "htraced.receiver.connect.timeout.ms"; + final static int CONNECT_TIMEOUT_MS_DEFAULT = 60000; + final static int CONNECT_TIMEOUT_MS_MIN = 50; + + /** + * The minimum number of milliseconds to keep alive a connection when it's + * not in use. + */ + final static String IDLE_TIMEOUT_MS_KEY = + "htraced.receiver.idle.timeout.ms"; + final static int IDLE_TIMEOUT_MS_DEFAULT = 60000; + final static int IDLE_TIMEOUT_MS_MIN = 0; + + /** + * Configure the retry times to use when an attempt to flush spans to + * htraced fails. This is configured as a comma-separated list of delay + * times in milliseconds. If the configured value is empty, no retries + * will be made. + */ + final static String FLUSH_RETRY_DELAYS_KEY = + "htraced.flush.retry.delays.key"; + final static String FLUSH_RETRY_DELAYS_DEFAULT = + "1000,30000"; + + /** + * The maximum length of time to go in between flush attempts. + * Once this time elapses, a flush will be triggered even if we don't + * have that many spans buffered. + */ + final static String MAX_FLUSH_INTERVAL_MS_KEY = + "htraced.receiver.max.flush.interval.ms"; + final static int MAX_FLUSH_INTERVAL_MS_DEFAULT = 60000; + final static int MAX_FLUSH_INTERVAL_MS_MIN = 10; + + /** + * Whether or not to use msgpack for span serialization. + * If this key is false, JSON over REST will be used. + * If this key is true, msgpack over custom RPC will be used. + */ + final static String PACKED_KEY = + "htraced.receiver.packed"; + final static boolean PACKED_DEFAULT = true; + + /** + * The size of the span buffers. + */ + final static String BUFFER_SIZE_KEY = + "htraced.receiver.buffer.size"; + final static int BUFFER_SIZE_DEFAULT = 48 * 1024 * 1024; + static int BUFFER_SIZE_MIN = 4 * 1024 * 1024; + // The maximum buffer size should not be longer than + // PackedBuffer.MAX_HRPC_BODY_LENGTH. + final static int BUFFER_SIZE_MAX = 63 * 1024 * 1024; + + /** + * Set the fraction of the span buffer which needs to fill up before we + * will automatically trigger a flush. This is a fraction, not a percentage. + * It is between 0 and 1. + */ + final static String BUFFER_SEND_TRIGGER_FRACTION_KEY = + "htraced.receiver.buffer.send.trigger.fraction"; + final static double BUFFER_SEND_TRIGGER_FRACTION_DEFAULT = 0.5; + final static double BUFFER_SEND_TRIGGER_FRACTION_MIN = 0.1; + + /** + * The length of time which receiveSpan should wait for a free spot in a + * span buffer before giving up and dropping the span + */ + final static String SPAN_DROP_TIMEOUT_MS_KEY = + "htraced.max.buffer.full.retry.ms.key"; + final static int SPAN_DROP_TIMEOUT_MS_DEFAULT = 5000; + + /** + * The length of time we should wait between displaying log messages on the + * rate-limited loggers. + */ + final static String ERROR_LOG_PERIOD_MS_KEY = + "htraced.error.log.period.ms"; + final static long ERROR_LOG_PERIOD_MS_DEFAULT = 30000L; + + @JsonProperty("ioTimeoutMs") + final int ioTimeoutMs; + + @JsonProperty("connectTimeoutMs") + final int connectTimeoutMs; + + @JsonProperty("idleTimeoutMs") + final int idleTimeoutMs; + + @JsonProperty("flushRetryDelays") + final int[] flushRetryDelays; + + @JsonProperty("maxFlushIntervalMs") + final int maxFlushIntervalMs; + + @JsonProperty("packed") + final boolean packed; + + @JsonProperty("bufferSize") + final int bufferSize; + + @JsonProperty("spanDropTimeoutMs") + final int spanDropTimeoutMs; + + @JsonProperty("errorLogPeriodMs") + final long errorLogPeriodMs; + + @JsonProperty("triggerSize") + final int triggerSize; + + @JsonProperty("endpointStr") + final String endpointStr; + + @JsonProperty("endpoint") + final InetSocketAddress endpoint; + + private static int getBoundedInt(final HTraceConfiguration conf, + String key, int defaultValue, int minValue, int maxValue) { + int val = conf.getInt(key, defaultValue); + if (val < minValue) { + LOG.warn("Can't set " + key + " to " + val + ". Using minimum value " + + "of " + minValue + " instead."); + return minValue; + } else if (val > maxValue) { + LOG.warn("Can't set " + key + " to " + val + ". Using maximum value " + + "of " + maxValue + " instead."); + return maxValue; + } + return val; + } + + private static long getBoundedLong(final HTraceConfiguration conf, + String key, long defaultValue, long minValue, long maxValue) { + String strVal = conf.get(key, Long.toString(defaultValue)); + long val = 0; + try { + val = Long.parseLong(strVal); + } catch (NumberFormatException nfe) { + throw new IllegalArgumentException("Bad value for '" + key + + "': should be long"); + } + if (val < minValue) { + LOG.warn("Can't set " + key + " to " + val + ". Using minimum value " + + "of " + minValue + " instead."); + return minValue; + } else if (val > maxValue) { + LOG.warn("Can't set " + key + " to " + val + ". Using maximum value " + + "of " + maxValue + " instead."); + return maxValue; + } + return val; + } + + private static double getBoundedDouble(final HTraceConfiguration conf, + String key, double defaultValue, double minValue, double maxValue) { + String strVal = conf.get(key, Double.toString(defaultValue)); + double val = 0; + try { + val = Double.parseDouble(strVal); + } catch (NumberFormatException nfe) { + throw new IllegalArgumentException("Bad value for '" + key + + "': should be double"); + } + if (val < minValue) { + LOG.warn("Can't set " + key + " to " + val + ". Using minimum value " + + "of " + minValue + " instead."); + return minValue; + } + if (val > maxValue) { + LOG.warn("Can't set " + key + " to " + val + ". Using maximum value " + + "of " + maxValue + " instead."); + return maxValue; + } + return val; + } + + private static int parseColonPort(String portStr) throws IOException { + int colonPosition = portStr.indexOf(':'); + if (colonPosition != 0) { + throw new IOException("Invalid port string " + portStr); + } + int port = Integer.parseInt(portStr.substring(1), 10); + if ((port < 0) || (port > 65535)) { + throw new IOException("Invalid port number " + port); + } + return port; + } + + /** + * Parse a hostname:port or ip:port pair. + * + * @param str The string to parse. + * @return The socket address. + */ + InetSocketAddress parseHostPortPair(String str) throws IOException { + str = str.trim(); + if (str.isEmpty()) { + throw new IOException("No hostname:port pair given."); + } + int bracketBegin = str.indexOf('['); + if (bracketBegin == 0) { + // Parse an ipv6-style address enclosed in square brackets. + int bracketEnd = str.indexOf(']'); + if (bracketEnd < 0) { + throw new IOException("Found left bracket, but no corresponding " + + "right bracket, in " + str); + } + String host = str.substring(bracketBegin + 1, bracketEnd); + int port = parseColonPort(str.substring(bracketEnd + 1)); + return InetSocketAddress.createUnresolved(host, port); + } else if (bracketBegin > 0) { + throw new IOException("Found a left bracket that wasn't at the " + + "start of the host:port pair in " + str); + } else { + int colon = str.indexOf(':'); + if (colon <= 0) { + throw new IOException("No port component found in " + str); + } + String host = str.substring(0, colon); + int port = parseColonPort(str.substring(colon)); + return InetSocketAddress.createUnresolved(host, port); + } + } + + static int[] getIntArray(String arrayStr) { + String[] array = arrayStr.split(","); + int nonEmptyEntries = 0; + for (String str : array) { + if (!str.trim().isEmpty()) { + nonEmptyEntries++; + } + } + int[] ret = new int[nonEmptyEntries]; + int i = 0; + for (String str : array) { + if (!str.trim().isEmpty()) { + ret[i++] = Integer.parseInt(str); + } + } + return ret; + } + + Conf(HTraceConfiguration conf) throws IOException { + this.ioTimeoutMs = getBoundedInt(conf, IO_TIMEOUT_MS_KEY, + IO_TIMEOUT_MS_DEFAULT, + IO_TIMEOUT_MS_MIN, Integer.MAX_VALUE); + this.connectTimeoutMs = getBoundedInt(conf, CONNECT_TIMEOUT_MS_KEY, + CONNECT_TIMEOUT_MS_DEFAULT, + CONNECT_TIMEOUT_MS_MIN, Integer.MAX_VALUE); + this.idleTimeoutMs = getBoundedInt(conf, IDLE_TIMEOUT_MS_KEY, + IDLE_TIMEOUT_MS_DEFAULT, + IDLE_TIMEOUT_MS_MIN, Integer.MAX_VALUE); + this.flushRetryDelays = getIntArray(conf.get(FLUSH_RETRY_DELAYS_KEY, + FLUSH_RETRY_DELAYS_DEFAULT)); + this.maxFlushIntervalMs = getBoundedInt(conf, MAX_FLUSH_INTERVAL_MS_KEY, + MAX_FLUSH_INTERVAL_MS_DEFAULT, + MAX_FLUSH_INTERVAL_MS_MIN, Integer.MAX_VALUE); + this.packed = conf.getBoolean(PACKED_KEY, PACKED_DEFAULT); + this.bufferSize = getBoundedInt(conf, BUFFER_SIZE_KEY, + BUFFER_SIZE_DEFAULT, + BUFFER_SIZE_MIN, BUFFER_SIZE_MAX); + double triggerFraction = getBoundedDouble(conf, + BUFFER_SEND_TRIGGER_FRACTION_KEY, + BUFFER_SEND_TRIGGER_FRACTION_DEFAULT, + BUFFER_SEND_TRIGGER_FRACTION_MIN, 1.0); + this.spanDropTimeoutMs = conf.getInt(SPAN_DROP_TIMEOUT_MS_KEY, + SPAN_DROP_TIMEOUT_MS_DEFAULT); + this.errorLogPeriodMs = getBoundedLong(conf, ERROR_LOG_PERIOD_MS_KEY, + ERROR_LOG_PERIOD_MS_DEFAULT, 0, Long.MAX_VALUE); + this.triggerSize = (int)(this.bufferSize * triggerFraction); + try { + this.endpointStr = conf.get(ADDRESS_KEY, ""); + this.endpoint = parseHostPortPair(endpointStr); + } catch (IOException e) { + throw new IOException("Error reading " + ADDRESS_KEY + ": " + + e.getMessage()); + } + } + + @Override + public String toString() { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + try { + return mapper.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java deleted file mode 100644 index 643bbd5..0000000 --- a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java +++ /dev/null @@ -1,431 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.htrace.impl; - -import java.io.IOException; -import java.net.URL; -import java.util.ArrayDeque; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.htrace.core.HTraceConfiguration; -import org.apache.htrace.core.Span; -import org.apache.htrace.core.SpanReceiver; -import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.api.ContentResponse; -import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.client.util.StringContentProvider; -import org.eclipse.jetty.http.HttpField; -import org.eclipse.jetty.http.HttpHeader; -import org.eclipse.jetty.http.HttpMethod; -import org.eclipse.jetty.http.HttpStatus; - -/** - * A {@link SpanReceiver} that passes Spans to htraced via REST. Implementation minimizes - * dependencies and aims for small footprint since this client will be the guest of another, - * the process traced. - * - * <p>Logs via commons-logging. Uses jetty client. Jetty has its own logging. To connect, see - * jetty logging to commons-logging and see https://issues.apache.org/jira/browse/HADOOP-6807 - * and http://blogs.bytecode.com.au/glen/2005/06/21/getting-your-logging-working-in-jetty.html. - * - * <p>This client depends on the REST defined in <code>rest.go</code> in the htraced REST server. - * - * <p>Create an instance by doing: - * <code>SpanReceiver receiver = new HTracedRESTReceiver(conf);</code> where conf is an instance - * of {@link HTraceConfiguration}. See the public keys defined below for what we will look for in - * the configuration. For example, set {@link #HTRACED_REST_URL_KEY} if - * <code>htraced</code> is in a non-standard location. Then call - * <code>receiver.receiveSpan(Span);</code> to send spans to an htraced - * instance. This method returns immediately. It sends the spans in background. - * - * <p>TODO: Shading works? - * TODO: Add lazy start; don't start background thread till a span gets queued. - * TODO: Add some metrics; how many times we've run, how many spans and what size we've sent. - */ -public class HTracedRESTReceiver extends SpanReceiver { - private static final Log LOG = LogFactory.getLog(HTracedRESTReceiver.class); - - /** - * The HttpClient to use for this receiver. - */ - private final HttpClient httpClient; - - /** - * The maximum number of spans to buffer. - */ - private final int capacity; - - /** - * REST URL to use writing Spans. - */ - private final String url; - - /** - * The maximum number of spans to send in a single PUT. - */ - private final int maxToSendAtATime; - - /** - * Runs background task to do the REST PUT. - */ - private final PostSpans postSpans; - - /** - * Thread for postSpans - */ - private final Thread postSpansThread; - - /** - * The connection timeout in milliseconds. - */ - public static final String CLIENT_CONNECT_TIMEOUT_MS_KEY = "client.connect.timeout.ms"; - private static final int CLIENT_CONNECT_TIMEOUT_MS_DEFAULT = 30000; - - /** - * The idle timeout in milliseconds. - */ - public static final String CLIENT_IDLE_TIMEOUT_MS_KEY = "client.idle.timeout.ms"; - private static final int CLIENT_IDLE_TIMEOUT_MS_DEFAULT = 120000; - - /** - * URL of the htraced REST server we are to talk to. - */ - public static final String HTRACED_REST_URL_KEY = "htraced.rest.url"; - private static final String HTRACED_REST_URL_DEFAULT = "http://localhost:9095/"; - - /** - * Maximum size of the queue to accumulate spans in. - * Cleared by the background thread that does the REST POST to htraced. - */ - public static final String CLIENT_REST_QUEUE_CAPACITY_KEY = "client.rest.queue.capacity"; - private static final int CLIENT_REST_QUEUE_CAPACITY_DEFAULT = 1000000; - - /** - * Period at which the background thread that does the REST POST to htraced in ms. - */ - public static final String CLIENT_REST_PERIOD_MS_KEY = "client.rest.period.ms"; - private static final int CLIENT_REST_PERIOD_MS_DEFAULT = 30000; - - /** - * Maximum spans to post to htraced at a time. - */ - public static final String CLIENT_REST_MAX_SPANS_AT_A_TIME_KEY = - "client.rest.batch.size"; - private static final int CLIENT_REST_MAX_SPANS_AT_A_TIME_DEFAULT = 100; - - /** - * Lock protecting the PostSpans data. - */ - private ReentrantLock lock = new ReentrantLock(); - - /** - * Condition variable used to wake up the PostSpans thread. - */ - private Condition cond = lock.newCondition(); - - /** - * True if we should shut down. - * Protected by the lock. - */ - private boolean shutdown = false; - - /** - * Simple bounded queue to hold spans between periodic runs of the httpclient. - * Protected by the lock. - */ - private final ArrayDeque<Span> spans; - - /** - * Keep last time we logged we were at capacity; used to prevent flooding of logs with - * "at capacity" messages. - */ - private AtomicLong lastAtCapacityWarningLog = new AtomicLong(0L); - - /** - * True if we should flush as soon as possible. Protected by the lock. - */ - private boolean mustStartFlush; - - /** - * Create an HttpClient instance. - * - * @param connTimeout The timeout to use for connecting. - * @param idleTimeout The idle timeout to use. - */ - HttpClient createHttpClient(long connTimeout, long idleTimeout) { - HttpClient httpClient = new HttpClient(); - httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT, - this.getClass().getSimpleName())); - httpClient.setConnectTimeout(connTimeout); - httpClient.setIdleTimeout(idleTimeout); - return httpClient; - } - - /** - * Constructor. - * You must call {@link #close()} post construction when done. - * @param conf - * @throws Exception - */ - public HTracedRESTReceiver(final HTraceConfiguration conf) throws Exception { - int connTimeout = conf.getInt(CLIENT_CONNECT_TIMEOUT_MS_KEY, - CLIENT_CONNECT_TIMEOUT_MS_DEFAULT); - int idleTimeout = conf.getInt(CLIENT_IDLE_TIMEOUT_MS_KEY, - CLIENT_IDLE_TIMEOUT_MS_DEFAULT); - this.httpClient = createHttpClient(connTimeout, idleTimeout); - this.capacity = conf.getInt(CLIENT_REST_QUEUE_CAPACITY_KEY, CLIENT_REST_QUEUE_CAPACITY_DEFAULT); - this.spans = new ArrayDeque<Span>(capacity); - // Build up the writeSpans URL. - URL restServer = new URL(conf.get(HTRACED_REST_URL_KEY, HTRACED_REST_URL_DEFAULT)); - URL url = new URL(restServer.getProtocol(), restServer.getHost(), restServer.getPort(), "/writeSpans"); - this.url = url.toString(); - // Period at which we run the background thread that does the REST POST to htraced. - int periodInMs = conf.getInt(CLIENT_REST_PERIOD_MS_KEY, CLIENT_REST_PERIOD_MS_DEFAULT); - // Maximum spans to send in one go - this.maxToSendAtATime = - conf.getInt(CLIENT_REST_MAX_SPANS_AT_A_TIME_KEY, CLIENT_REST_MAX_SPANS_AT_A_TIME_DEFAULT); - // Start up the httpclient. - this.httpClient.start(); - // Start the background thread. - this.postSpans = new PostSpans(periodInMs); - this.postSpansThread = new Thread(postSpans); - this.postSpansThread.setDaemon(true); - this.postSpansThread.setName("PostSpans"); - this.postSpansThread.start(); - if (LOG.isDebugEnabled()) { - LOG.debug("Created new HTracedRESTReceiver with connTimeout=" + - connTimeout + ", idleTimeout = " + idleTimeout + ", capacity=" + - capacity + ", url=" + url + ", periodInMs=" + periodInMs + - ", maxToSendAtATime=" + maxToSendAtATime); - } - } - - /** - * POST spans runnable. - * Run on a period. Services the passed in queue taking spans and sending them to traced via http. - */ - private class PostSpans implements Runnable { - private final long periodInNs; - private final ArrayDeque<Span> spanBuf; - - private PostSpans(long periodInMs) { - this.periodInNs = TimeUnit.NANOSECONDS. - convert(periodInMs, TimeUnit.MILLISECONDS); - this.spanBuf = new ArrayDeque<Span>(maxToSendAtATime); - } - - /** - * The span sending thread. - * - * We send a batch of spans for one of two reasons: there are already - * maxToSendAtATime spans in the buffer, or the client.rest.period.ms - * has elapsed. The idea is that we want to strike a balance between - * sending a lot of spans at a time, for efficiency purposes, and - * making sure that we don't buffer spans locally for too long. - * - * The longer we buffer spans locally, the longer we will have to wait - * to see the results of our client operations in the GUI, and the higher - * the risk of losing them if the client crashes. - */ - @Override - public void run() { - long waitNs; - try { - waitNs = periodInNs; - while (true) { - lock.lock(); - try { - if (shutdown) { - if (spans.isEmpty()) { - LOG.debug("Shutting down PostSpans thread..."); - break; - } - } else { - try { - waitNs = cond.awaitNanos(waitNs); - if (mustStartFlush) { - waitNs = 0; - mustStartFlush = false; - } - } catch (InterruptedException e) { - LOG.info("Got InterruptedException"); - waitNs = 0; - } - } - if ((spans.size() > maxToSendAtATime) || (waitNs <= 0) || - shutdown) { - loadSpanBuf(); - waitNs = periodInNs; - } - } finally { - lock.unlock(); - } - // Once the lock has been safely released, we can do some network - // I/O without blocking the client process. - if (!spanBuf.isEmpty()) { - sendSpans(); - spanBuf.clear(); - } - } - } finally { - if (httpClient != null) { - try { - httpClient.stop(); - } catch (Exception e) { - LOG.error("Error shutting down httpClient", e); - } - } - spans.clear(); - } - } - - private void loadSpanBuf() { - for (int loaded = 0; loaded < maxToSendAtATime; loaded++) { - Span span = spans.pollFirst(); - if (span == null) { - return; - } - spanBuf.add(span); - } - } - - private void sendSpans() { - try { - Request request = httpClient.newRequest(url).method(HttpMethod.POST); - request.header(HttpHeader.CONTENT_TYPE, "application/json"); - StringBuilder bld = new StringBuilder(); - for (Span span : spanBuf) { - bld.append(span.toJson()); - } - request.content(new StringContentProvider(bld.toString())); - ContentResponse response = request.send(); - if (response.getStatus() == HttpStatus.OK_200) { - if (LOG.isDebugEnabled()) { - LOG.debug("POSTED " + spanBuf.size() + " spans"); - } - } else { - LOG.error("Status: " + response.getStatus()); - LOG.error(response.getHeaders()); - LOG.error(response.getContentAsString()); - } - } catch (InterruptedException e) { - LOG.error(e); - } catch (TimeoutException e) { - LOG.error(e); - } catch (ExecutionException e) { - LOG.error(e); - } - } - } - - @Override - public void close() throws IOException { - LOG.debug("Closing HTracedRESTReceiver(" + url + ")."); - lock.lock(); - try { - this.shutdown = true; - cond.signal(); - } finally { - lock.unlock(); - } - try { - postSpansThread.join(120000); - if (postSpansThread.isAlive()) { - LOG.error("Timed out without closing HTracedRESTReceiver(" + - url + ")."); - } else { - LOG.debug("Closed HTracedRESTReceiver(" + url + ")."); - } - } catch (InterruptedException e) { - LOG.error("Interrupted while joining postSpans", e); - } - } - - /** - * Start flushing the buffered spans. - * - * Note that even after calling this function, you will still have to wait - * for the flush to finish happening. This function just starts the flush; - * it does not block until it has completed. You also do not get - * "read-after-write consistency" with htraced... the spans that are - * written may be buffered for a short period of time prior to being - * readable. This is not a problem for production use (since htraced is not - * a database), but it means that most unit tests will need a loop in their - * "can I read what I wrote" tests. - */ - void startFlushing() { - LOG.info("Triggering HTracedRESTReceiver flush."); - lock.lock(); - try { - mustStartFlush = true; - cond.signal(); - } finally { - lock.unlock(); - } - } - - private static long WARN_TIMEOUT_MS = 300000; - - @Override - public void receiveSpan(Span span) { - boolean added = false; - lock.lock(); - try { - if (shutdown) { - LOG.trace("receiveSpan(span=" + span + "): HTracedRESTReceiver " + - "is already shut down."); - return; - } - if (spans.size() < capacity) { - spans.add(span); - added = true; - if (spans.size() >= maxToSendAtATime) { - cond.signal(); - } - } else { - cond.signal(); - } - } finally { - lock.unlock(); - } - if (!added) { - long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), - TimeUnit.NANOSECONDS); - long last = lastAtCapacityWarningLog.get(); - if (now - last > WARN_TIMEOUT_MS) { - // Only log every 5 minutes. Any more than this for a guest process - // is obnoxious. - if (lastAtCapacityWarningLog.compareAndSet(last, now)) { - // If the atomic-compare-and-set succeeds, we should log. Otherwise, - // we should assume another thread already logged and bumped up the - // value of lastAtCapacityWarning sometime between our get and the - // "if" statement. - LOG.warn("There are too many HTrace spans to buffer! We have " + - "already buffered " + capacity + " spans. Dropping spans."); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedSpanReceiver.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedSpanReceiver.java new file mode 100644 index 0000000..f5f493c --- /dev/null +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedSpanReceiver.java @@ -0,0 +1,333 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.htrace.core.HTraceConfiguration; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.htrace.core.Span; +import org.apache.htrace.core.SpanReceiver; + +/** + * The SpanReceiver which sends spans to htraced. + * + * HTracedSpanReceiver sends trace spans out to the htraced daemon, where they + * are stored and indexed. It supports two forms of RPC: a JSON/HTTP form, and + * an HRPC/msgpack form. We will use the msgpack form when + * htraced.receiver.packed is set to true. + * + * HTraced buffers are several megabytes in size, and we reuse them to avoid + * creating extra garbage on the heap. They are flushed whenever a timeout + * elapses, or when they get more than a configurable percent full. We allocate + * two buffers so that we can continue filling one buffer while the other is + * being sent over the wire. The buffers store serialized spans. This is + * better than storing references to span objects because it minimzes the amount + * of pointers we have to follow during a GC. Buffers are managed by instances + * of BufferManager. + */ +public class HTracedSpanReceiver extends SpanReceiver { + private static final Log LOG = LogFactory.getLog(HTracedSpanReceiver.class); + + private final static int MAX_CLOSING_WAIT_MS = 120000; + + private final FaultInjector faultInjector; + + private final Conf conf; + + private final ReentrantLock lock = new ReentrantLock(); + + private final Condition wakePostSpansThread = lock.newCondition(); + + private final BufferManager bufferManager[] = new BufferManager[2]; + + private final RateLimitedLogger flushErrorLog; + + private final RateLimitedLogger spanDropLog; + + private final PostSpansThread thread; + + private boolean shutdown = false; + + private int activeBuf = 0; + + private int flushingBuf = -1; + + private long lastBufferClearedTimeMs = 0; + + static class FaultInjector { + static FaultInjector NO_OP = new FaultInjector(); + public void handleContentLengthTrigger(int len) { } + public void handleThreadStart() throws Exception { } + public void handleFlush() throws IOException { } + } + + public HTracedSpanReceiver(HTraceConfiguration c) throws Exception { + this(c, FaultInjector.NO_OP); + } + + HTracedSpanReceiver(HTraceConfiguration c, + FaultInjector faultInjector) throws Exception { + this.faultInjector = faultInjector; + this.conf = new Conf(c); + if (this.conf.packed) { + for (int i = 0; i < bufferManager.length; i++) { + bufferManager[i] = new PackedBufferManager(conf); + } + } else { + for (int i = 0; i < bufferManager.length; i++) { + bufferManager[i] = new RestBufferManager(conf); + } + } + this.flushErrorLog = new RateLimitedLogger(LOG, conf.errorLogPeriodMs); + this.spanDropLog = new RateLimitedLogger(LOG, conf.errorLogPeriodMs); + this.thread = new PostSpansThread(); + LOG.debug("Created new HTracedSpanReceiver with " + conf.toString()); + } + + @Override + public void receiveSpan(Span span) { + long startTimeMs = 0; + int numTries = 1; + while (true) { + lock.lock(); + try { + if (shutdown) { + LOG.info("Unable to add span because HTracedSpanReceiver is shutting down."); + return; + } + Throwable exc = null; + try { + bufferManager[activeBuf].writeSpan(span); + int contentLength = bufferManager[activeBuf].contentLength(); + if (contentLength > conf.triggerSize) { + if (LOG.isDebugEnabled()) { + LOG.debug("Triggering buffer #" + activeBuf + " flush because" + + " buffer contains " + contentLength + " bytes, and " + + "triggerSize is " + conf.triggerSize); + } + faultInjector.handleContentLengthTrigger(contentLength); + wakePostSpansThread.signal(); + } + return; + } catch (Exception e) { + exc = e; + } catch (Error e) { + exc = e; + } + if (startTimeMs == 0) { + startTimeMs = TimeUtil.nowMs(); + } + long deltaMs = TimeUtil.deltaMs(startTimeMs, TimeUtil.nowMs()); + if (deltaMs > conf.spanDropTimeoutMs) { + spanDropLog.error("Dropping a span after unsuccessfully " + + "attempting to add it for " + deltaMs + " ms. There is not " + + "enough buffer space. Please increase " + Conf.BUFFER_SIZE_KEY + + " or decrease the rate of spans being generated."); + return; + } else if (LOG.isDebugEnabled()) { + LOG.debug("Unable to write span to buffer #" + activeBuf + + " after " + numTries + " attempt(s) and " + deltaMs + " ms" + + ". Buffer already has " + + bufferManager[activeBuf].getNumberOfSpans() + " spans.", + exc); + } + numTries++; + } finally { + lock.unlock(); + } + try { + Thread.sleep(conf.spanDropTimeoutMs / 3); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + @Override + public void close() { + lock.lock(); + try { + shutdown = true; + wakePostSpansThread.signal(); + } finally { + lock.unlock(); + } + try { + thread.join(MAX_CLOSING_WAIT_MS); + } catch (InterruptedException e) { + LOG.error("HTracedSpanReceiver#close was interrupted", e); + Thread.currentThread().interrupt(); + } + } + + private class PostSpansThread extends Thread { + PostSpansThread() { + this.setDaemon(true); + this.setName("PostSpans"); + this.start(); + } + + private boolean shouldWaitForCond(long timeSinceLastClearedMs) { + if (shutdown) { + // If we're shutting down, don't wait around. + LOG.trace("Should not wait for cond because we're shutting down."); + return false; + } + int contentLength = bufferManager[activeBuf].contentLength(); + if (contentLength == 0) { + // If there is nothing in the buffer, there is nothing to do. + if (LOG.isTraceEnabled()) { + LOG.trace("Should wait for cond because we have no data buffered " + + "in bufferManager " + activeBuf); + } + lastBufferClearedTimeMs = TimeUtil.nowMs(); + return true; + } else if (contentLength >= conf.triggerSize) { + // If the active buffer is filling up, start flushing. + if (LOG.isDebugEnabled()) { + LOG.debug("Should not wait for cond because we have more than " + + conf.triggerSize + " bytes buffered in bufferManager " + + activeBuf); + } + return false; + } + if (timeSinceLastClearedMs > conf.maxFlushIntervalMs) { + // If we have let the spans sit in the buffer for too long, + // start flushing. + if (LOG.isTraceEnabled()) { + LOG.trace("Should not wait for cond because it has been " + + timeSinceLastClearedMs + " ms since our last flush, and we " + + "are overdue for another because maxFlushIntervalMs is " + + conf.maxFlushIntervalMs); + } + return false; + } + LOG.trace("Should wait for cond."); + return true; + } + + @Override + public void run() { + try { + faultInjector.handleThreadStart(); + LOG.debug("Starting HTracedSpanReceiver thread for " + + conf.endpointStr); + BufferManager flushBufManager = null; + while (true) { + lock.lock(); + flushingBuf = -1; + try { + while (true) { + long timeSinceLastClearedMs = TimeUtil. + deltaMs(lastBufferClearedTimeMs, TimeUtil.nowMs()); + if (!shouldWaitForCond(timeSinceLastClearedMs)) { + break; + } + long waitMs = conf.maxFlushIntervalMs - + Math.min(conf.maxFlushIntervalMs, TimeUtil. + deltaMs(TimeUtil.nowMs(), lastBufferClearedTimeMs)); + if (LOG.isTraceEnabled()) { + LOG.trace("Waiting on wakePostSpansThread for " + waitMs + + " ms."); + } + try { + wakePostSpansThread.await(waitMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.info("HTraceSpanReceiver thread was interrupted.", e); + throw e; + } + } + if (shutdown && (bufferManager[activeBuf].contentLength() == 0)) { + LOG.debug("PostSpansThread shutting down."); + return; + } + flushingBuf = activeBuf; + flushBufManager = bufferManager[flushingBuf]; + activeBuf = (activeBuf == 1) ? 0 : 1; + } finally { + lock.unlock(); + } + doFlush(flushBufManager); + flushBufManager.clear(); + lastBufferClearedTimeMs = TimeUtil.nowMs(); + if (LOG.isTraceEnabled()) { + LOG.trace("setting lastBufferClearedTimeMs to " + lastBufferClearedTimeMs); + } + } + } catch (Throwable e) { + LOG.error("PostSpansThread exiting on unexpected exception", e); + } finally { + for (int i = 0; i < bufferManager.length; i++) { + bufferManager[i].close(); + } + } + } + + private void doFlush(BufferManager flushBufManager) + throws InterruptedException { + try { + flushBufManager.prepare(); + } catch (IOException e) { + LOG.error("Failed to prepare buffer containing " + + flushBufManager.getNumberOfSpans() + " spans for " + + "sending to " + conf.endpointStr + " Discarding " + + "all spans.", e); + return; + } + int flushTries = 0; + while (true) { + Throwable exc; + try { + faultInjector.handleFlush(); + flushBufManager.flush(); + exc = null; + } catch (RuntimeException e) { + exc = e; + } catch (Exception e) { + exc = e; + } + if (exc == null) { + return; + } + int numSpans = flushBufManager.getNumberOfSpans(); + String excMessage = "Failed to flush " + numSpans + " htrace " + + "spans to " + conf.endpointStr + " on try " + (flushTries + 1); + if (flushTries >= conf.flushRetryDelays.length) { + excMessage += ". Discarding all spans."; + } + if (LOG.isDebugEnabled()) { + LOG.error(excMessage, exc); + } else { + flushErrorLog.error(excMessage, exc); + } + if (flushTries >= conf.flushRetryDelays.length) { + return; + } + int delayMs = conf.flushRetryDelays[flushTries]; + Thread.sleep(delayMs); + flushTries++; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java new file mode 100644 index 0000000..f867ad7 --- /dev/null +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBuffer.java @@ -0,0 +1,449 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.htrace.core.MilliSpan; +import org.apache.htrace.core.TimelineAnnotation; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessagePacker; +import org.msgpack.core.MessageUnpacker; +import org.msgpack.core.buffer.MessageBuffer; +import org.msgpack.core.buffer.MessageBufferOutput; + +import org.apache.htrace.core.Span; +import org.apache.htrace.core.SpanId; + +/** + * A ByteBuffer which we are writing msgpack data to. + */ +class PackedBuffer { + /** + * A MessageBufferOutput that simply outputs to a ByteBuffer. + */ + private class PackedBufferOutput implements MessageBufferOutput { + private MessageBuffer savedBuffer; + + PackedBufferOutput() { + } + + @Override + public MessageBuffer next(int bufferSize) throws IOException { + if (savedBuffer == null || savedBuffer.size() != bufferSize) { + savedBuffer = MessageBuffer.newBuffer(bufferSize); + } + MessageBuffer buffer = savedBuffer; + savedBuffer = null; + return buffer; + } + + @Override + public void flush(MessageBuffer buffer) throws IOException { + ByteBuffer b = buffer.toByteBuffer(); + bb.put(b); + savedBuffer = buffer; + } + + @Override + public void close() throws IOException { + // no-op + } + } + + private static final Log LOG = LogFactory.getLog(PackedBuffer.class); + private static final Charset UTF8 = StandardCharsets.UTF_8; + private static final byte SPANS[] = "Spans".getBytes(UTF8); + private static final byte DEFAULT_PID[] = "DefaultPid".getBytes(UTF8); + private static final byte A[] = "a".getBytes(UTF8); + private static final byte B[] = "b".getBytes(UTF8); + private static final byte E[] = "e".getBytes(UTF8); + private static final byte D[] = "d".getBytes(UTF8); + private static final byte R[] = "r".getBytes(UTF8); + private static final byte P[] = "p".getBytes(UTF8); + private static final byte N[] = "n".getBytes(UTF8); + private static final byte T[] = "t".getBytes(UTF8); + private static final byte M[] = "m".getBytes(UTF8); + private static final int HRPC_MAGIC = 0x43525448; + static final int HRPC_REQ_FRAME_LENGTH = 20; + static final int HRPC_RESP_FRAME_LENGTH = 20; + static final int MAX_HRPC_ERROR_LENGTH = 4 * 1024 * 1024; + static final int MAX_HRPC_BODY_LENGTH = 64 * 1024 * 1024; + private static final int SPAN_ID_BYTE_LENGTH = 16; + static final MessagePack.Config MSGPACK_CONF = + new MessagePack.ConfigBuilder() + .readBinaryAsString(false) + .readStringAsBinary(false) + .build(); + /** + * The array which we are filling. + */ + final ByteBuffer bb; + + /** + * Used to tell the MessagePacker to output to our array. + */ + final PackedBufferOutput out; + + /** + * A temporary buffer for serializing span ids and other things. + */ + final byte[] temp; + + /** + * Generates msgpack output. + */ + final MessagePacker packer; + + /** + * Create a new PackedBuffer. + * + * @param bb The ByteBuffer to use to create the packed buffer. + */ + PackedBuffer(ByteBuffer bb) { + this.bb = bb; + this.out = new PackedBufferOutput(); + this.temp = new byte[SPAN_ID_BYTE_LENGTH]; + this.packer = new MessagePacker(out, MSGPACK_CONF); + } + + /** + * Write the fixed-length request frame which starts packed RPC messages. + */ + static void writeReqFrame(ByteBuffer bb, int methodId, long seq, int length) + throws IOException { + int oldPos = bb.position(); + boolean success = false; + try { + bb.order(ByteOrder.LITTLE_ENDIAN); + bb.putInt(HRPC_MAGIC); + bb.putInt(methodId); + bb.putLong(seq); + bb.putInt(length); + success = true; + } finally { + if (!success) { + bb.position(oldPos); + } + } + } + + /** + * Write an 8-byte value to a byte array as little-endian. + */ + private static void longToBigEndian(byte b[], int pos, long val) { + b[pos + 0] =(byte) ((val >> 56) & 0xff); + b[pos + 1] =(byte) ((val >> 48) & 0xff); + b[pos + 2] =(byte) ((val >> 40) & 0xff); + b[pos + 3] =(byte) ((val >> 32) & 0xff); + b[pos + 4] =(byte) ((val >> 24) & 0xff); + b[pos + 5] =(byte) ((val >> 16) & 0xff); + b[pos + 6] =(byte) ((val >> 8) & 0xff); + b[pos + 7] =(byte) ((val >> 0) & 0xff); + } + + private void writeSpanId(SpanId spanId) throws IOException { + longToBigEndian(temp, 0, spanId.getHigh()); + longToBigEndian(temp, 8, spanId.getLow()); + packer.packBinaryHeader(SPAN_ID_BYTE_LENGTH); + packer.writePayload(temp, 0, SPAN_ID_BYTE_LENGTH); + } + + /** + * Serialize a span to the given OutputStream. + */ + void writeSpan(Span span) throws IOException { + boolean success = false; + int oldPos = bb.position(); + try { + int mapSize = 0; + if (span.getSpanId().isValid()) { + mapSize++; + } + if (span.getStartTimeMillis() != 0) { + mapSize++; + } + if (span.getStopTimeMillis() != 0) { + mapSize++; + } + if (!span.getDescription().isEmpty()) { + mapSize++; + } + if (!span.getTracerId().isEmpty()) { + mapSize++; + } + if (span.getParents().length > 0) { + mapSize++; + } + if (!span.getKVAnnotations().isEmpty()) { + mapSize++; + } + if (!span.getTimelineAnnotations().isEmpty()) { + mapSize++; + } + packer.packMapHeader(mapSize); + if (span.getSpanId().isValid()) { + packer.packRawStringHeader(1); + packer.writePayload(A); + writeSpanId(span.getSpanId()); + } + if (span.getStartTimeMillis() != 0) { + packer.packRawStringHeader(1); + packer.writePayload(B); + packer.packLong(span.getStartTimeMillis()); + } + if (span.getStopTimeMillis() != 0) { + packer.packRawStringHeader(1); + packer.writePayload(E); + packer.packLong(span.getStopTimeMillis()); + } + if (!span.getDescription().isEmpty()) { + packer.packRawStringHeader(1); + packer.writePayload(D); + packer.packString(span.getDescription()); + } + if (!span.getTracerId().isEmpty()) { + packer.packRawStringHeader(1); + packer.writePayload(R); + packer.packString(span.getTracerId()); + } + if (span.getParents().length > 0) { + packer.packRawStringHeader(1); + packer.writePayload(P); + packer.packArrayHeader(span.getParents().length); + for (int i = 0; i < span.getParents().length; i++) { + writeSpanId(span.getParents()[i]); + } + } + if (!span.getKVAnnotations().isEmpty()) { + packer.packRawStringHeader(1); + packer.writePayload(N); + Map<String, String> map = span.getKVAnnotations(); + packer.packMapHeader(map.size()); + for (Map.Entry<String, String> entry : map.entrySet()) { + packer.packString(entry.getKey()); + packer.packString(entry.getValue()); + } + } + if (!span.getTimelineAnnotations().isEmpty()) { + packer.packRawStringHeader(1); + packer.writePayload(T); + List<TimelineAnnotation> list = span.getTimelineAnnotations(); + packer.packArrayHeader(list.size()); + for (TimelineAnnotation annotation : list) { + packer.packMapHeader(2); + packer.packRawStringHeader(1); + packer.writePayload(T); + packer.packLong(annotation.getTime()); + packer.packRawStringHeader(1); + packer.writePayload(M); + packer.packString(annotation.getMessage()); + } + } + packer.flush(); + success = true; + } finally { + if (!success) { + // If we failed earlier, restore the old position. + // This is so that if we run out of space, we don't add a + // partial span to the buffer. + bb.position(oldPos); + } + } + } + + static SpanId readSpanId(MessageUnpacker unpacker) throws IOException { + int alen = unpacker.unpackBinaryHeader(); + if (alen != SPAN_ID_BYTE_LENGTH) { + throw new IOException("Invalid length given for spanID array. " + + "Expected " + SPAN_ID_BYTE_LENGTH + "; got " + alen); + } + byte[] payload = new byte[SPAN_ID_BYTE_LENGTH]; + unpacker.readPayload(payload); + return new SpanId( + ((payload[ 7] & 0xffL) << 0) | + ((payload[ 6] & 0xffL) << 8) | + ((payload[ 5] & 0xffL) << 16) | + ((payload[ 4] & 0xffL) << 24) | + ((payload[ 3] & 0xffL) << 32) | + ((payload[ 2] & 0xffL) << 40) | + ((payload[ 1] & 0xffL) << 48) | + ((payload[ 0] & 0xffL) << 56), + ((payload[15] & 0xffL) << 0) | + ((payload[14] & 0xffL) << 8) | + ((payload[13] & 0xffL) << 16) | + ((payload[12] & 0xffL) << 24) | + ((payload[11] & 0xffL) << 32) | + ((payload[10] & 0xffL) << 40) | + ((payload[ 9] & 0xffL) << 48) | + ((payload[ 8] & 0xffL) << 56) + ); + } + + /** + * Read a span. Used in unit tests. Not optimized. + */ + static Span readSpan(MessageUnpacker unpacker) throws IOException { + int numEntries = unpacker.unpackMapHeader(); + MilliSpan.Builder builder = new MilliSpan.Builder(); + while (--numEntries >= 0) { + String key = unpacker.unpackString(); + if (key.length() != 1) { + throw new IOException("Unknown key " + key); + } + switch (key.charAt(0)) { + case 'a': + builder.spanId(readSpanId(unpacker)); + break; + case 'b': + builder.begin(unpacker.unpackLong()); + break; + case 'e': + builder.end(unpacker.unpackLong()); + break; + case 'd': + builder.description(unpacker.unpackString()); + break; + case 'r': + builder.tracerId(unpacker.unpackString()); + break; + case 'p': + int numParents = unpacker.unpackArrayHeader(); + SpanId[] parents = new SpanId[numParents]; + for (int i = 0; i < numParents; i++) { + parents[i] = readSpanId(unpacker); + } + builder.parents(parents); + break; + case 'n': + int mapEntries = unpacker.unpackMapHeader(); + HashMap<String, String> entries = + new HashMap<String, String>(mapEntries); + for (int i = 0; i < mapEntries; i++) { + String k = unpacker.unpackString(); + String v = unpacker.unpackString(); + entries.put(k, v); + } + builder.traceInfo(entries); + break; + case 't': + int listEntries = unpacker.unpackArrayHeader(); + ArrayList<TimelineAnnotation> list = + new ArrayList<TimelineAnnotation>(listEntries); + for (int i = 0; i < listEntries; i++) { + int timelineObjectSize = unpacker.unpackMapHeader(); + long time = 0; + String msg = ""; + for (int j = 0; j < timelineObjectSize; j++) { + String tlKey = unpacker.unpackString(); + if (tlKey.length() != 1) { + throw new IOException("Unknown timeline map key " + tlKey); + } + switch (tlKey.charAt(0)) { + case 't': + time = unpacker.unpackLong(); + break; + case 'm': + msg = unpacker.unpackString(); + break; + default: + throw new IOException("Unknown timeline map key " + tlKey); + } + } + list.add(new TimelineAnnotation(time, msg)); + } + builder.timeline(list); + break; + default: + throw new IOException("Unknown key " + key); + } + } + return builder.build(); + } + + void beginWriteSpansRequest(String defaultPid, int numSpans) + throws IOException { + boolean success = false; + int oldPos = bb.position(); + try { + int mapSize = 1; + if (defaultPid != null) { + mapSize++; + } + packer.packMapHeader(mapSize); + if (defaultPid != null) { + packer.packRawStringHeader(DEFAULT_PID.length); + packer.writePayload(DEFAULT_PID); + packer.packString(defaultPid); + } + packer.packRawStringHeader(SPANS.length); + packer.writePayload(SPANS); + packer.packArrayHeader(numSpans); + packer.flush(); + success = true; + } finally { + if (!success) { + bb.position(oldPos); + } + } + } + + /** + * Get the underlying ByteBuffer. + */ + ByteBuffer getBuffer() { + return bb; + } + + /** + * Reset our position in the array. + */ + void reset() throws IOException { + packer.reset(out); + } + + void close() { + try { + packer.close(); + } catch (IOException e) { + LOG.error("Error closing MessagePacker", e); + } + } + + public String toHexString() { + String prefix = ""; + StringBuilder bld = new StringBuilder(); + ByteBuffer b = bb.duplicate(); + b.flip(); + while (b.hasRemaining()) { + bld.append(String.format("%s%02x", prefix, b.get())); + prefix = " "; + } + return bld.toString(); + } +}
