Repository: incubator-htrace Updated Branches: refs/heads/master 9c0efd51f -> ab5837448
HTRACE-144. Include IP address in span process description (cmccabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/ab583744 Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/ab583744 Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/ab583744 Branch: refs/heads/master Commit: ab5837448bcfffcc288d23dbb00da46a2c1aeeaf Parents: 9c0efd5 Author: Colin P. Mccabe <[email protected]> Authored: Wed Mar 25 16:06:31 2015 -0700 Committer: Colin P. Mccabe <[email protected]> Committed: Tue Mar 31 17:06:25 2015 -0700 ---------------------------------------------------------------------- .../go/src/org/apache/htrace/htraced/rest.go | 7 +- .../src/main/java/org/apache/htrace/Span.java | 47 ++- .../src/main/java/org/apache/htrace/Trace.java | 10 - .../src/main/java/org/apache/htrace/Tracer.java | 17 -- .../htrace/impl/LocalFileSpanReceiver.java | 6 + .../java/org/apache/htrace/impl/MilliSpan.java | 64 +++- .../java/org/apache/htrace/impl/ProcessId.java | 291 +++++++++++++++++++ .../htrace/impl/TestLocalFileSpanReceiver.java | 2 + .../org/apache/htrace/impl/TestMilliSpan.java | 9 + .../org/apache/htrace/impl/TestProcessId.java | 47 +++ .../apache/htrace/impl/FlumeSpanReceiver.java | 5 + .../apache/htrace/impl/HBaseSpanReceiver.java | 5 + .../htrace/impl/TestHBaseSpanReceiver.java | 5 + .../apache/htrace/impl/HTracedRESTReceiver.java | 10 +- .../htrace/impl/TestHTracedRESTReceiver.java | 33 ++- .../apache/htrace/impl/ZipkinSpanReceiver.java | 6 + 16 files changed, 506 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/ab583744/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go index 58486f1..c113a90 100644 --- a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go @@ -162,6 +162,7 @@ func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques setResponseHeaders(w.Header()) dec := json.NewDecoder(req.Body) spans := make([]*common.Span, 0, 32) + defaultPid := req.Header.Get("htrace-pid") for { var span common.Span err := dec.Decode(&span) @@ -173,9 +174,13 @@ func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques } break } + if span.ProcessId == "" { + span.ProcessId = defaultPid + } spans = append(spans, &span) } - hand.lg.Debugf("writeSpansHandler: received %d span(s).\n", len(spans)) + hand.lg.Debugf("writeSpansHandler: received %d span(s). defaultPid = %s\n", + len(spans), defaultPid) for spanIdx := range spans { hand.lg.Debugf("writing span %s\n", spans[spanIdx].ToJson()) hand.store.WriteSpan(spans[spanIdx]) http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/ab583744/htrace-core/src/main/java/org/apache/htrace/Span.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/Span.java b/htrace-core/src/main/java/org/apache/htrace/Span.java index 71ed872..71164d4 100644 --- a/htrace-core/src/main/java/org/apache/htrace/Span.java +++ b/htrace-core/src/main/java/org/apache/htrace/Span.java @@ -63,7 +63,9 @@ public interface Span { boolean isRunning(); /** - * Return a textual description of this span + * Return a textual description of this span.<p/> + * + * Will never be null. */ String getDescription(); @@ -120,22 +122,32 @@ public interface Span { void addTimelineAnnotation(String msg); /** - * Get data associated with this span (read only) + * Get data associated with this span (read only)<p/> + * + * Will never be null. */ Map<String, String> getKVAnnotations(); /** - * Get any timeline annotations (read only) + * Get any timeline annotations (read only)<p/> + * + * Will never be null. */ List<TimelineAnnotation> getTimelineAnnotations(); /** - * Return a unique id for the node or process from which this Span originated. - * IP address is a reasonable choice. + * Return a unique id for the process from which this Span originated.<p/> + * + * Will never be null. */ String getProcessId(); /** + * Set the process id of a span. + */ + void setProcessId(String s); + + /** * Serialize to Json */ String toJson(); @@ -145,12 +157,25 @@ public interface Span { public void serialize(Span span, JsonGenerator jgen, SerializerProvider provider) throws IOException { jgen.writeStartObject(); - jgen.writeStringField("i", String.format("%016x", span.getTraceId())); - jgen.writeStringField("s", String.format("%016x", span.getSpanId())); - jgen.writeNumberField("b", span.getStartTimeMillis()); - jgen.writeNumberField("e", span.getStopTimeMillis()); - jgen.writeStringField("d", span.getDescription()); - jgen.writeStringField("r", span.getProcessId()); + if (span.getTraceId() != 0) { + jgen.writeStringField("i", String.format("%016x", span.getTraceId())); + } + if (span.getSpanId() != 0) { + jgen.writeStringField("s", String.format("%016x", span.getSpanId())); + } + if (span.getStartTimeMillis() != 0) { + jgen.writeNumberField("b", span.getStartTimeMillis()); + } + if (span.getStopTimeMillis() != 0) { + jgen.writeNumberField("e", span.getStopTimeMillis()); + } + if (!span.getDescription().isEmpty()) { + jgen.writeStringField("d", span.getDescription()); + } + String processId = span.getProcessId(); + if (!processId.isEmpty()) { + jgen.writeStringField("r", processId); + } jgen.writeArrayFieldStart("p"); for (long parent : span.getParents()) { jgen.writeString(String.format("%016x", parent)); http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/ab583744/htrace-core/src/main/java/org/apache/htrace/Trace.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/Trace.java b/htrace-core/src/main/java/org/apache/htrace/Trace.java index c7147ae..98d7563 100644 --- a/htrace-core/src/main/java/org/apache/htrace/Trace.java +++ b/htrace-core/src/main/java/org/apache/htrace/Trace.java @@ -84,7 +84,6 @@ public class Trace { traceId(tinfo.traceId). spanId(Tracer.nonZeroRandom64()). parents(new long[] { tinfo.spanId }). - processId(Tracer.getProcessId()). build(); return continueSpan(newSpan); } @@ -134,15 +133,6 @@ public class Trace { } /** - * Set the processId to be used for all Spans created by this Tracer. - * - * @see Span - */ - public static void setProcessId(String processId) { - Tracer.processId = processId; - } - - /** * Removes the given SpanReceiver from the list of SpanReceivers. */ public static void removeReceiver(SpanReceiver rcvr) { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/ab583744/htrace-core/src/main/java/org/apache/htrace/Tracer.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/Tracer.java b/htrace-core/src/main/java/org/apache/htrace/Tracer.java index af2d20e..b8c4c1a 100644 --- a/htrace-core/src/main/java/org/apache/htrace/Tracer.java +++ b/htrace-core/src/main/java/org/apache/htrace/Tracer.java @@ -49,7 +49,6 @@ public class Tracer { }; public static final TraceInfo DONT_TRACE = new TraceInfo(-1, -1); private static final long EMPTY_PARENT_ARRAY[] = new long[0]; - protected static String processId = null; /** * Log a client error, and throw an exception. @@ -84,7 +83,6 @@ public class Tracer { traceId(nonZeroRandom64()). parents(EMPTY_PARENT_ARRAY). spanId(nonZeroRandom64()). - processId(getProcessId()). build(); } else { return parent.child(description); @@ -121,7 +119,6 @@ public class Tracer { return span; } - public TraceScope continueSpan(Span s) { Span oldCurrent = currentSpan(); setCurrentSpan(s); @@ -131,18 +128,4 @@ public class Tracer { protected int numReceivers() { return receivers.size(); } - - static String getProcessId() { - if (processId == null) { - String cmdLine = System.getProperty("sun.java.command"); - if (cmdLine != null && !cmdLine.isEmpty()) { - String fullClassName = cmdLine.split("\\s+")[0]; - String[] classParts = fullClassName.split("\\."); - cmdLine = classParts[classParts.length - 1]; - } - - processId = (cmdLine == null || cmdLine.isEmpty()) ? "Unknown" : cmdLine; - } - return processId; - } } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/ab583744/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java b/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java index 07e4a81..95da72c 100644 --- a/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java +++ b/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java @@ -59,6 +59,7 @@ public class LocalFileSpanReceiver implements SpanReceiver { private final FileOutputStream stream; private final FileChannel channel; private final ReentrantLock channelLock = new ReentrantLock(); + private final ProcessId processId; public LocalFileSpanReceiver(HTraceConfiguration conf) { int capacity = conf.getInt(CAPACITY_KEY, CAPACITY_DEFAULT); @@ -93,6 +94,7 @@ public class LocalFileSpanReceiver implements SpanReceiver { LOG.debug("Created new LocalFileSpanReceiver with path = " + path + ", capacity = " + capacity); } + this.processId = new ProcessId(conf); } /** @@ -135,6 +137,10 @@ public class LocalFileSpanReceiver implements SpanReceiver { @Override public void receiveSpan(Span span) { + if (span.getProcessId().isEmpty()) { + span.setProcessId(processId.get()); + } + // Serialize the span data into a byte[]. Note that we're not holding the // lock here, to improve concurrency. byte jsonBuf[] = null; http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/ab583744/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java index afd0202..c57eb25 100644 --- a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java +++ b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java @@ -49,6 +49,7 @@ import java.util.Random; public class MilliSpan implements Span { private static ObjectWriter JSON_WRITER = new ObjectMapper().writer(); private static final long EMPTY_PARENT_ARRAY[] = new long[0]; + private static final String EMPTY_STRING = ""; private long begin; private long end; @@ -57,7 +58,7 @@ public class MilliSpan implements Span { private long parents[]; private final long spanId; private Map<String, String> traceInfo = null; - private final String processId; + private String processId; private List<TimelineAnnotation> timeline = null; private final static Random random = new Random(); @@ -88,12 +89,12 @@ public class MilliSpan implements Span { public static class Builder { private long begin; private long end; - private String description; + private String description = EMPTY_STRING; private long traceId; private long parents[] = EMPTY_PARENT_ARRAY; private long spanId; private Map<String, String> traceInfo = null; - private String processId; + private String processId = EMPTY_STRING; private List<TimelineAnnotation> timeline = null; public Builder() { @@ -158,6 +159,18 @@ public class MilliSpan implements Span { } } + public MilliSpan() { + this.begin = 0; + this.end = 0; + this.description = EMPTY_STRING; + this.traceId = 0; + this.parents = EMPTY_PARENT_ARRAY; + this.spanId = 0; + this.traceInfo = null; + this.processId = EMPTY_STRING; + this.timeline = null; + } + private MilliSpan(Builder builder) { this.begin = builder.begin; this.end = builder.end; @@ -285,6 +298,11 @@ public class MilliSpan implements Span { } @Override + public void setProcessId(String processId) { + this.processId = processId; + } + + @Override public String toJson() { StringWriter writer = new StringWriter(); try { @@ -307,18 +325,38 @@ public class MilliSpan implements Span { throws IOException, JsonProcessingException { JsonNode root = jp.getCodec().readTree(jp); Builder builder = new Builder(); - builder.begin(root.get("b").asLong()); - builder.end(root.get("e").asLong()); - builder.description(root.get("d").asText()); - builder.traceId(parseUnsignedHexLong(root.get("i").asText())); - builder.spanId(parseUnsignedHexLong(root.get("s").asText())); - builder.processId(root.get("r").asText()); + JsonNode bNode = root.get("b"); + if (bNode != null) { + builder.begin(bNode.asLong()); + } + JsonNode eNode = root.get("e"); + if (eNode != null) { + builder.end(eNode.asLong()); + } + JsonNode dNode = root.get("d"); + if (dNode != null) { + builder.description(dNode.asText()); + } + JsonNode iNode = root.get("i"); + if (iNode != null) { + builder.traceId(parseUnsignedHexLong(iNode.asText())); + } + JsonNode sNode = root.get("s"); + if (sNode != null) { + builder.spanId(parseUnsignedHexLong(sNode.asText())); + } + JsonNode rNode = root.get("r"); + if (rNode != null) { + builder.processId(rNode.asText()); + } JsonNode parentsNode = root.get("p"); LinkedList<Long> parents = new LinkedList<Long>(); - for (Iterator<JsonNode> iter = parentsNode.elements(); - iter.hasNext(); ) { - JsonNode parentIdNode = iter.next(); - parents.add(parseUnsignedHexLong(parentIdNode.asText())); + if (parentsNode != null) { + for (Iterator<JsonNode> iter = parentsNode.elements(); + iter.hasNext(); ) { + JsonNode parentIdNode = iter.next(); + parents.add(parseUnsignedHexLong(parentIdNode.asText())); + } } builder.parents(parents); JsonNode traceInfoNode = root.get("n"); http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/ab583744/htrace-core/src/main/java/org/apache/htrace/impl/ProcessId.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/ProcessId.java b/htrace-core/src/main/java/org/apache/htrace/impl/ProcessId.java new file mode 100644 index 0000000..ad2e5fc --- /dev/null +++ b/htrace-core/src/main/java/org/apache/htrace/impl/ProcessId.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.htrace.HTraceConfiguration; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.management.ManagementFactory; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; +import java.util.Locale; +import java.util.TreeSet; + +/** + * The HTrace process ID.<p/> + * + * HTrace process IDs are created from format strings. + * Format strings contain variables which the ProcessId class will + * replace with the correct values at runtime.<p/> + * + * <ul> + * <li>${ip}: will be replaced with an ip address.</li> + * <li>${pname}: will be replaced the process name obtained from java.</li> + * </ul><p/> + * + * For example, the string "${pname}/${ip}" will be replaced with something + * like: DataNode/192.168.0.1, assuming that the process' name is DataNode + * and its IP address is 192.168.0.1.<p/> + * + * Process ID strings can contain backslashes as escapes. + * For example, "\a" will map to "a". "\${ip}" will map to the literal + * string "${ip}", not the IP address. A backslash itself can be escaped by a + * preceding backslash. + */ +public final class ProcessId { + public static final Log LOG = LogFactory.getLog(ProcessId.class); + + /** + * The configuration key to use for process id + */ + static final String PROCESS_ID_KEY = "process.id"; + + /** + * The default process ID to use if no other ID is configured. + */ + private static final String DEFAULT_PROCESS_ID = "${pname}/${ip}"; + + private final String processId; + + ProcessId(String fmt) { + StringBuilder bld = new StringBuilder(); + StringBuilder varBld = null; + boolean escaping = false; + int varSeen = 0; + for (int i = 0, len = fmt.length() ; i < len; i++) { + char c = fmt.charAt(i); + if (c == '\\') { + if (!escaping) { + escaping = true; + continue; + } + } + switch (varSeen) { + case 0: + if (c == '$') { + if (!escaping) { + varSeen = 1; + continue; + } + } + escaping = false; + varSeen = 0; + bld.append(c); + break; + case 1: + if (c == '{') { + if (!escaping) { + varSeen = 2; + varBld = new StringBuilder(); + continue; + } + } + escaping = false; + varSeen = 0; + bld.append("$").append(c); + break; + default: + if (c == '}') { + if (!escaping) { + String var = varBld.toString(); + bld.append(processShellVar(var)); + varBld = null; + varSeen = 0; + continue; + } + } + escaping = false; + varBld.append(c); + varSeen++; + break; + } + } + if (varSeen > 0) { + LOG.warn("Unterminated process ID substitution variable at the end " + + "of format string " + fmt); + } + this.processId = bld.toString(); + if (LOG.isTraceEnabled()) { + LOG.trace("ProcessID(fmt=" + fmt + "): computed process ID of \"" + + this.processId + "\""); + } + } + + public ProcessId(HTraceConfiguration conf) { + this(conf.get(PROCESS_ID_KEY, DEFAULT_PROCESS_ID)); + } + + private String processShellVar(String var) { + if (var.equals("pname")) { + return getProcessName(); + } else if (var.equals("ip")) { + return getBestIpString(); + } else if (var.equals("pid")) { + return Long.valueOf(getOsPid()).toString(); + } else { + LOG.warn("unknown ProcessID variable " + var); + return ""; + } + } + + static String getProcessName() { + String cmdLine = System.getProperty("sun.java.command"); + if (cmdLine != null && !cmdLine.isEmpty()) { + String fullClassName = cmdLine.split("\\s+")[0]; + String[] classParts = fullClassName.split("\\."); + cmdLine = classParts[classParts.length - 1]; + } + return (cmdLine == null || cmdLine.isEmpty()) ? "Unknown" : cmdLine; + } + + /** + * Get the best IP address that represents this node.<p/> + * + * This is complicated since nodes can have multiple network interfaces, + * and each network interface can have multiple IP addresses. What we're + * looking for here is an IP address that will serve to identify this node + * to HTrace. So we prefer site-local addresess (i.e. private ones on the + * LAN) to publicly routable interfaces. If there are multiple addresses + * to choose from, we select the one which comes first in textual sort + * order. This should ensure that we at least consistently call each node + * by a single name. + */ + static String getBestIpString() { + Enumeration<NetworkInterface> ifaces; + try { + ifaces = NetworkInterface.getNetworkInterfaces(); + } catch (SocketException e) { + LOG.error("Error getting network interfaces", e); + return "127.0.0.1"; + } + TreeSet<String> siteLocalCandidates = new TreeSet<String>(); + TreeSet<String> candidates = new TreeSet<String>(); + while (ifaces.hasMoreElements()) { + NetworkInterface iface = ifaces.nextElement(); + for (Enumeration<InetAddress> addrs = + iface.getInetAddresses(); addrs.hasMoreElements();) { + InetAddress addr = addrs.nextElement(); + if (!addr.isLoopbackAddress()) { + if (addr.isSiteLocalAddress()) { + siteLocalCandidates.add(addr.getHostAddress()); + } else { + candidates.add(addr.getHostAddress()); + } + } + } + } + if (!siteLocalCandidates.isEmpty()) { + return siteLocalCandidates.first(); + } + if (!candidates.isEmpty()) { + return candidates.first(); + } + return "127.0.0.1"; + } + + /** + * Get the process id from the operating system.<p/> + * + * Unfortunately, there is no simple method to get the process id in Java. + * The approach we take here is to use the shell method (see + * {ProcessId#getOsPidFromShellPpid}) unless we are on Windows, where the + * shell is not available. On Windows, we use + * {ProcessId#getOsPidFromManagementFactory}, which depends on some + * undocumented features of the JVM, but which doesn't require a shell. + */ + static long getOsPid() { + if ((System.getProperty("os.name", "generic").toLowerCase(Locale.ENGLISH)). + contains("windows")) { + return getOsPidFromManagementFactory(); + } else { + return getOsPidFromShellPpid(); + } + } + + /** + * Get the process ID by executing a shell and printing the PPID (parent + * process ID).<p/> + * + * This method of getting the process ID doesn't depend on any undocumented + * features of the virtual machine, and should work on almost any UNIX + * operating system. + */ + private static long getOsPidFromShellPpid() { + Process p = null; + StringBuilder sb = new StringBuilder(); + try { + p = new ProcessBuilder("/usr/bin/env", "sh", "-c", "echo $PPID"). + redirectErrorStream(true).start(); + BufferedReader reader = new BufferedReader( + new InputStreamReader(p.getInputStream())); + String line = ""; + while ((line = reader.readLine()) != null) { + sb.append(line.trim()); + } + int exitVal = p.waitFor(); + if (exitVal != 0) { + throw new IOException("Process exited with error code " + + Integer.valueOf(exitVal).toString()); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while getting operating system pid from " + + "the shell.", e); + return 0L; + } catch (IOException e) { + LOG.error("Error getting operating system pid from the shell.", e); + return 0L; + } finally { + if (p != null) { + p.destroy(); + } + } + try { + return Long.parseLong(sb.toString()); + } catch (NumberFormatException e) { + LOG.error("Error parsing operating system pid from the shell.", e); + return 0L; + } + } + + /** + * Get the process ID by looking at the name of the managed bean for the + * runtime system of the Java virtual machine.<p/> + * + * Although this is undocumented, in the Oracle JVM this name is of the form + * [OS_PROCESS_ID]@[HOSTNAME]. + */ + private static long getOsPidFromManagementFactory() { + try { + return Long.parseLong(ManagementFactory.getRuntimeMXBean(). + getName().split("@")[0]); + } catch (NumberFormatException e) { + LOG.error("Failed to get the operating system process ID from the name " + + "of the managed bean for the JVM.", e); + return 0L; + } + } + + public String get() { + return processId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/ab583744/htrace-core/src/test/java/org/apache/htrace/impl/TestLocalFileSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/test/java/org/apache/htrace/impl/TestLocalFileSpanReceiver.java b/htrace-core/src/test/java/org/apache/htrace/impl/TestLocalFileSpanReceiver.java index 60b5430..634bef8 100644 --- a/htrace-core/src/test/java/org/apache/htrace/impl/TestLocalFileSpanReceiver.java +++ b/htrace-core/src/test/java/org/apache/htrace/impl/TestLocalFileSpanReceiver.java @@ -57,6 +57,7 @@ public class TestLocalFileSpanReceiver { confMap.put(LocalFileSpanReceiver.PATH_KEY, traceFileName); confMap.put(SpanReceiverBuilder.SPAN_RECEIVER_CONF_KEY, LocalFileSpanReceiver.class.getName()); + confMap.put(ProcessId.PROCESS_ID_KEY, "testPid"); SpanReceiver rcvr = new SpanReceiverBuilder(HTraceConfiguration.fromMap(confMap)) .logErrors(false).build(); @@ -69,5 +70,6 @@ public class TestLocalFileSpanReceiver { ObjectMapper mapper = new ObjectMapper(); MilliSpan span = mapper.readValue(new File(traceFileName), MilliSpan.class); assertEquals("testWriteToLocalFile", span.getDescription()); + assertEquals("testPid", span.getProcessId()); } } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/ab583744/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java b/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java index 857e9ac..41ee108 100644 --- a/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java +++ b/htrace-core/src/test/java/org/apache/htrace/impl/TestMilliSpan.java @@ -144,4 +144,13 @@ public class TestMilliSpan { MilliSpan dspan = mapper.readValue(json, MilliSpan.class); compareSpans(span, dspan); } + + @Test + public void testJsonSerializationWithFieldsNotSet() throws Exception { + MilliSpan span = new MilliSpan.Builder().build(); + String json = span.toJson(); + ObjectMapper mapper = new ObjectMapper(); + MilliSpan dspan = mapper.readValue(json, MilliSpan.class); + compareSpans(span, dspan); + } } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/ab583744/htrace-core/src/test/java/org/apache/htrace/impl/TestProcessId.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/test/java/org/apache/htrace/impl/TestProcessId.java b/htrace-core/src/test/java/org/apache/htrace/impl/TestProcessId.java new file mode 100644 index 0000000..9e5f6b9 --- /dev/null +++ b/htrace-core/src/test/java/org/apache/htrace/impl/TestProcessId.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import java.io.IOException; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +public class TestProcessId { + private void testProcessIdImpl(String expected, String fmt) { + assertEquals(expected, new ProcessId(fmt).get()); + } + + @Test + public void testSimpleProcessIds() { + testProcessIdImpl("abc", "abc"); + testProcessIdImpl("abc", "a\\bc"); + testProcessIdImpl("abc", "ab\\c"); + testProcessIdImpl("abc", "\\a\\b\\c"); + testProcessIdImpl("a\\bc", "a\\\\bc"); + } + + @Test + public void testSubstitutionVariables() throws IOException { + testProcessIdImpl(ProcessId.getProcessName(), "${pname}"); + testProcessIdImpl("my." + ProcessId.getProcessName(), "my.${pname}"); + testProcessIdImpl(ProcessId.getBestIpString() + ".str", "${ip}.str"); + testProcessIdImpl("${pname}", "\\${pname}"); + testProcessIdImpl("$cash$money{}", "$cash$money{}"); + testProcessIdImpl("Foo." + Long.valueOf(ProcessId.getOsPid()).toString(), + "Foo.${pid}"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/ab583744/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java b/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java index 54b8a14..baa4fa1 100644 --- a/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java +++ b/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java @@ -89,10 +89,12 @@ public class FlumeSpanReceiver implements SpanReceiver { private int maxSpanBatchSize; private String flumeHostName; private int flumePort; + private final ProcessId processId; public FlumeSpanReceiver(HTraceConfiguration conf) { this.queue = new ArrayBlockingQueue<Span>(1000); this.tf = new SimpleThreadFactory(); + this.processId = new ProcessId(conf); configure(conf); } @@ -272,6 +274,9 @@ public class FlumeSpanReceiver implements SpanReceiver { public void receiveSpan(Span span) { if (running.get()) { try { + if (span.getProcessId().isEmpty()) { + span.setProcessId(processId.get()); + } this.queue.add(span); } catch (IllegalStateException e) { LOG.error("Error trying to append span (" + http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/ab583744/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java b/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java index 7a99366..2faf4bb 100644 --- a/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java +++ b/htrace-hbase/src/main/java/org/apache/htrace/impl/HBaseSpanReceiver.java @@ -126,6 +126,7 @@ public class HBaseSpanReceiver implements SpanReceiver { private final byte[] cf; private final byte[] icf; private final int maxSpanBatchSize; + private final ProcessId processId; public HBaseSpanReceiver(HTraceConfiguration conf) { this.queue = new ArrayBlockingQueue<Span>(1000); @@ -153,6 +154,7 @@ public class HBaseSpanReceiver implements SpanReceiver { for (int i = 0; i < numThreads; i++) { this.service.submit(new WriteSpanRunnable()); } + this.processId = new ProcessId(conf); } private class WriteSpanRunnable implements Runnable { @@ -331,6 +333,9 @@ public class HBaseSpanReceiver implements SpanReceiver { public void receiveSpan(Span span) { if (running.get()) { try { + if (span.getProcessId().isEmpty()) { + span.setProcessId(processId.get()); + } this.queue.add(span); } catch (IllegalStateException e) { // todo: supress repeating error logs. http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/ab583744/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java b/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java index 7bf7bac..bf93220 100644 --- a/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java +++ b/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java @@ -181,6 +181,11 @@ public class TestHBaseSpanReceiver { } @Override + public void setProcessId(String processId) { + throw new UnsupportedOperationException(); + } + + @Override public String getDescription() { return span.getDescription(); } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/ab583744/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java index 7edc2b8..5a4daaf 100644 --- a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java @@ -170,6 +170,11 @@ public class HTracedRESTReceiver implements SpanReceiver { private boolean mustStartFlush; /** + * The process ID to use for all spans. + */ + private final ProcessId processId; + + /** * Create an HttpClient instance. * * @param connTimeout The timeout to use for connecting. @@ -221,6 +226,7 @@ public class HTracedRESTReceiver implements SpanReceiver { capacity + ", url=" + url + ", periodInMs=" + periodInMs + ", maxToSendAtATime=" + maxToSendAtATime); } + processId = new ProcessId(conf); } /** @@ -316,6 +322,7 @@ public class HTracedRESTReceiver implements SpanReceiver { try { Request request = httpClient.newRequest(url).method(HttpMethod.POST); request.header(HttpHeader.CONTENT_TYPE, "application/json"); + request.header("htrace-pid", processId.get()); StringBuilder bld = new StringBuilder(); for (Span span : spanBuf) { bld.append(span.toJson()); @@ -412,7 +419,8 @@ public class HTracedRESTReceiver implements SpanReceiver { lock.unlock(); } if (!added) { - long now = System.nanoTime() / 1000000L; + long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), + TimeUnit.NANOSECONDS); long last = lastAtCapacityWarningLog.get(); if (now - last > WARN_TIMEOUT_MS) { // Only log every 5 minutes. Any more than this for a guest process http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/ab583744/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java index eca6d6d..9a01005 100644 --- a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java +++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.net.URL; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.htrace.HTraceConfiguration; @@ -65,6 +66,7 @@ public class TestHTracedRESTReceiver { */ private final class TestHTraceConfiguration extends HTraceConfiguration { private final URL restServerUrl; + final static String PROCESS_ID = "TestHTracedRESTReceiver"; public TestHTraceConfiguration(final URL restServerUrl) { this.restServerUrl = restServerUrl; @@ -79,6 +81,8 @@ public class TestHTracedRESTReceiver { public String get(String key, String defaultValue) { if (key.equals(HTracedRESTReceiver.HTRACED_REST_URL_KEY)) { return this.restServerUrl.toString(); + } else if (key.equals(ProcessId.PROCESS_ID_KEY)) { + return PROCESS_ID; } return defaultValue; } @@ -120,14 +124,21 @@ public class TestHTracedRESTReceiver { final int NUM_SPANS = 3; final HttpClient http = receiver.createHttpClient(60000, 60000); http.start(); + Span spans[] = new Span[NUM_SPANS]; + for (int i = 0; i < NUM_SPANS; i++) { + MilliSpan.Builder builder = new MilliSpan.Builder(). + parents(new long[]{1L}). + spanId(i); + if (i == NUM_SPANS - 1) { + builder.processId("specialPid"); + } + spans[i] = builder.build(); + } try { for (int i = 0; i < NUM_SPANS; i++) { - Span span = new MilliSpan.Builder().parents( - new long [] {1L}).spanId(i).build(); - LOG.info(span.toString()); - receiver.receiveSpan(span); + LOG.info("receiving " + spans[i].toString()); + receiver.receiveSpan(spans[i]); } - if (testClose) { receiver.close(); } else { @@ -149,6 +160,18 @@ public class TestHTracedRESTReceiver { return false; } LOG.info("Got " + content + " for span " + i); + ObjectMapper mapper = new ObjectMapper(); + MilliSpan dspan = mapper.readValue(content, MilliSpan.class); + assertEquals((long)i, dspan.getSpanId()); + // Every span should have the process ID we set in the + // configuration... except for the last span, which had + // a custom value set. + if (i == NUM_SPANS - 1) { + assertEquals("specialPid", dspan.getProcessId()); + } else { + assertEquals(TestHTraceConfiguration.PROCESS_ID, + dspan.getProcessId()); + } } return true; } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/ab583744/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java index 06ff0ad..d75c504 100644 --- a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java +++ b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java @@ -132,6 +132,8 @@ public class ZipkinSpanReceiver implements SpanReceiver { } }; + private final ProcessId processId; + //////////////////// /// Variables that will change on each call to configure() /////////////////// @@ -144,6 +146,7 @@ public class ZipkinSpanReceiver implements SpanReceiver { public ZipkinSpanReceiver(HTraceConfiguration conf) { this.queue = new ArrayBlockingQueue<Span>(1000); this.protocolFactory = new TBinaryProtocol.Factory(); + this.processId = new ProcessId(conf); configure(conf); } @@ -360,6 +363,9 @@ public class ZipkinSpanReceiver implements SpanReceiver { public void receiveSpan(Span span) { if (running.get()) { try { + if (span.getProcessId().isEmpty()) { + span.setProcessId(processId.get()); + } this.queue.add(span); } catch (IllegalStateException e) { LOG.error("Error trying to append span (" + span.getDescription() + ") to the queue."
