http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fd889b65/htrace-core/src/main/java/org/apache/htrace/core/Sampler.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/core/Sampler.java b/htrace-core/src/main/java/org/apache/htrace/core/Sampler.java new file mode 100644 index 0000000..91843f5 --- /dev/null +++ b/htrace-core/src/main/java/org/apache/htrace/core/Sampler.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +/** + * Extremely simple callback to determine the frequency that an action should be + * performed. + * <p/> + * For example, the next() function may look like this: + * <p/> + * <pre> + * <code> + * public boolean next() { + * return Math.random() > 0.5; + * } + * </code> + * </pre> + * This would trace 50% of all gets, 75% of all puts and would not trace any other requests. + */ +public interface Sampler { + public static final Sampler ALWAYS = AlwaysSampler.INSTANCE; + public static final Sampler NEVER = NeverSampler.INSTANCE; + + public boolean next(); +}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fd889b65/htrace-core/src/main/java/org/apache/htrace/core/SamplerBuilder.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/core/SamplerBuilder.java b/htrace-core/src/main/java/org/apache/htrace/core/SamplerBuilder.java new file mode 100644 index 0000000..5b53905 --- /dev/null +++ b/htrace-core/src/main/java/org/apache/htrace/core/SamplerBuilder.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import java.lang.reflect.Constructor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A {@link Sampler} builder. It reads a {@link Sampler} class name from the provided + * configuration using the {@link #SAMPLER_CONF_KEY} key. Unqualified class names + * are interpreted as members of the {@code org.apache.htrace.impl} package. The {@link #build()} + * method constructs an instance of that class, initialized with the same configuration. + */ +public class SamplerBuilder { + + // TODO: should follow the same API as SpanReceiverBuilder + + public final static String SAMPLER_CONF_KEY = "sampler"; + private final static String DEFAULT_PACKAGE = "org.apache.htrace.core"; + private final static ClassLoader classLoader = + SamplerBuilder.class.getClassLoader(); + private final HTraceConfiguration conf; + private static final Log LOG = LogFactory.getLog(SamplerBuilder.class); + + public SamplerBuilder(HTraceConfiguration conf) { + this.conf = conf; + } + + public Sampler build() { + Sampler sampler = newSampler(); + if (LOG.isTraceEnabled()) { + LOG.trace("Created new sampler of type " + + sampler.getClass().getName(), new Exception()); + } + return sampler; + } + + private Sampler newSampler() { + String str = conf.get(SAMPLER_CONF_KEY); + if (str == null || str.isEmpty()) { + return NeverSampler.INSTANCE; + } + if (!str.contains(".")) { + str = DEFAULT_PACKAGE + "." + str; + } + Class cls = null; + try { + cls = classLoader.loadClass(str); + } catch (ClassNotFoundException e) { + LOG.error("SamplerBuilder cannot find sampler class " + str + + ": falling back on NeverSampler."); + return NeverSampler.INSTANCE; + } + Constructor<Sampler> ctor = null; + try { + ctor = cls.getConstructor(HTraceConfiguration.class); + } catch (NoSuchMethodException e) { + LOG.error("SamplerBuilder cannot find a constructor for class " + str + + "which takes an HTraceConfiguration. Falling back on " + + "NeverSampler."); + return NeverSampler.INSTANCE; + } + try { + return ctor.newInstance(conf); + } catch (ReflectiveOperationException e) { + LOG.error("SamplerBuilder reflection error when constructing " + str + + ". Falling back on NeverSampler.", e); + return NeverSampler.INSTANCE; + } catch (Throwable e) { + LOG.error("SamplerBuilder constructor error when constructing " + str + + ". Falling back on NeverSampler.", e); + return NeverSampler.INSTANCE; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fd889b65/htrace-core/src/main/java/org/apache/htrace/core/Span.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/core/Span.java b/htrace-core/src/main/java/org/apache/htrace/core/Span.java new file mode 100644 index 0000000..db1a961 --- /dev/null +++ b/htrace-core/src/main/java/org/apache/htrace/core/Span.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + + +/** + * Base interface for gathering and reporting statistics about a block of + * execution. + * <p/> + * Spans should form a directed acyclic graph structure. It should be possible + * to keep following the parents of a span until you arrive at a span with no + * parents.<p/> + */ +@JsonSerialize(using = Span.SpanSerializer.class) +public interface Span { + /** + * The block has completed, stop the clock + */ + void stop(); + + /** + * Get the start time, in milliseconds + */ + long getStartTimeMillis(); + + /** + * Get the stop time, in milliseconds + */ + long getStopTimeMillis(); + + /** + * Return the total amount of time elapsed since start was called, if running, + * or difference between stop and start + */ + long getAccumulatedMillis(); + + /** + * Has the span been started and not yet stopped? + */ + boolean isRunning(); + + /** + * Return a textual description of this span.<p/> + * + * Will never be null. + */ + String getDescription(); + + /** + * A pseudo-unique (random) number assigned to this span instance.<p/> + * + * The spanId is immutable and cannot be changed. It is safe to access this + * from multiple threads. + */ + SpanId getSpanId(); + + /** + * Create a child span of this span with the given description + */ + Span child(String description); + + @Override + String toString(); + + /** + * Returns the parent IDs of the span.<p/> + * + * The array will be empty if there are no parents. + */ + SpanId[] getParents(); + + /** + * Set the parents of this span.<p/> + * + * Any existing parents will be cleared by this call. + */ + void setParents(SpanId[] parents); + + /** + * Add a data annotation associated with this span + */ + void addKVAnnotation(String key, String value); + + /** + * Add a timeline annotation associated with this span + */ + void addTimelineAnnotation(String msg); + + /** + * Get data associated with this span (read only)<p/> + * + * Will never be null. + */ + Map<String, String> getKVAnnotations(); + + /** + * Get any timeline annotations (read only)<p/> + * + * Will never be null. + */ + List<TimelineAnnotation> getTimelineAnnotations(); + + /** + * Return a unique id for the process from which this Span originated.<p/> + * + * Will never be null. + */ + String getTracerId(); + + /** + * Set the process id of a span. + */ + void setTracerId(String s); + + /** + * Serialize to Json + */ + String toJson(); + + public static class SpanSerializer extends JsonSerializer<Span> { + @Override + public void serialize(Span span, JsonGenerator jgen, SerializerProvider provider) + throws IOException { + jgen.writeStartObject(); + if (span.getSpanId().isValid()) { + jgen.writeStringField("a", span.getSpanId().toString()); + } + if (span.getStartTimeMillis() != 0) { + jgen.writeNumberField("b", span.getStartTimeMillis()); + } + if (span.getStopTimeMillis() != 0) { + jgen.writeNumberField("e", span.getStopTimeMillis()); + } + if (!span.getDescription().isEmpty()) { + jgen.writeStringField("d", span.getDescription()); + } + String tracerId = span.getTracerId(); + if (!tracerId.isEmpty()) { + jgen.writeStringField("r", tracerId); + } + jgen.writeArrayFieldStart("p"); + for (SpanId parent : span.getParents()) { + jgen.writeString(parent.toString()); + } + jgen.writeEndArray(); + Map<String, String> traceInfoMap = span.getKVAnnotations(); + if (!traceInfoMap.isEmpty()) { + jgen.writeObjectFieldStart("n"); + for (Map.Entry<String, String> e : traceInfoMap.entrySet()) { + jgen.writeStringField(e.getKey(), e.getValue()); + } + jgen.writeEndObject(); + } + List<TimelineAnnotation> timelineAnnotations = + span.getTimelineAnnotations(); + if (!timelineAnnotations.isEmpty()) { + jgen.writeArrayFieldStart("t"); + for (TimelineAnnotation tl : timelineAnnotations) { + jgen.writeStartObject(); + jgen.writeNumberField("t", tl.getTime()); + jgen.writeStringField("m", tl.getMessage()); + jgen.writeEndObject(); + } + jgen.writeEndArray(); + } + jgen.writeEndObject(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fd889b65/htrace-core/src/main/java/org/apache/htrace/core/SpanId.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/core/SpanId.java b/htrace-core/src/main/java/org/apache/htrace/core/SpanId.java new file mode 100644 index 0000000..e10f894 --- /dev/null +++ b/htrace-core/src/main/java/org/apache/htrace/core/SpanId.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import java.math.BigInteger; +import java.lang.Void; +import java.util.concurrent.ThreadLocalRandom; +import java.util.Random; + +/** + * Uniquely identifies an HTrace span. + * + * Span IDs are 128 bits in total. The upper 64 bits of a span ID is the same + * as the upper 64 bits of the parent span, if there is one. The lower 64 bits + * are always random. + */ +public final class SpanId implements Comparable<SpanId> { + private static final int SPAN_ID_STRING_LENGTH = 32; + private final long high; + private final long low; + + /** + * The invalid span ID, which is all zeroes. + * + * It is also the "least" span ID in the sense that it is considered + * smaller than any other span ID. + */ + public static SpanId INVALID = new SpanId(0, 0); + + private static long nonZeroRand64() { + while (true) { + long r = ThreadLocalRandom.current().nextLong(); + if (r != 0) { + return r; + } + } + } + + public static SpanId fromRandom() { + return new SpanId(nonZeroRand64(), nonZeroRand64()); + } + + public static SpanId fromString(String str) { + if (str.length() != SPAN_ID_STRING_LENGTH) { + throw new RuntimeException("Invalid SpanID string: length was not " + + SPAN_ID_STRING_LENGTH); + } + long high = + ((Long.parseLong(str.substring(0, 8), 16)) << 32) | + (Long.parseLong(str.substring(8, 16), 16)); + long low = + ((Long.parseLong(str.substring(16, 24), 16)) << 32) | + (Long.parseLong(str.substring(24, 32), 16)); + return new SpanId(high, low); + } + + public SpanId(long high, long low) { + this.high = high; + this.low = low; + } + + public long getHigh() { + return high; + } + + public long getLow() { + return low; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof SpanId)) { + return false; + } + SpanId other = (SpanId)o; + return ((other.high == high) && (other.low == low)); + } + + @Override + public int compareTo(SpanId other) { + int cmp = compareAsUnsigned(high, other.high); + if (cmp != 0) { + return cmp; + } + return compareAsUnsigned(low, other.low); + } + + private static int compareAsUnsigned(long a, long b) { + boolean aSign = a < 0; + boolean bSign = b < 0; + if (aSign != bSign) { + if (aSign) { + return 1; + } else { + return -1; + } + } + if (aSign) { + a = -a; + b = -b; + } + if (a < b) { + return -1; + } else if (a > b) { + return 1; + } else { + return 0; + } + } + + @Override + public int hashCode() { + return (int)((0xffffffff & (high >> 32))) ^ + (int)((0xffffffff & (high >> 0))) ^ + (int)((0xffffffff & (low >> 32))) ^ + (int)((0xffffffff & (low >> 0))); + } + + @Override + public String toString() { + return String.format("%08x%08x%08x%08x", + (0x00000000ffffffffL & (high >> 32)), + (0x00000000ffffffffL & high), + (0x00000000ffffffffL & (low >> 32)), + (0x00000000ffffffffL & low)); + } + + public boolean isValid() { + return (high != 0) || (low != 0); + } + + public SpanId newChildId() { + return new SpanId(high, nonZeroRand64()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fd889b65/htrace-core/src/main/java/org/apache/htrace/core/SpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/core/SpanReceiver.java b/htrace-core/src/main/java/org/apache/htrace/core/SpanReceiver.java new file mode 100644 index 0000000..5547c51 --- /dev/null +++ b/htrace-core/src/main/java/org/apache/htrace/core/SpanReceiver.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + + +import java.io.Closeable; + + +/** + * The collector within a process that is the destination of Spans when a trace is running. + * {@code SpanReceiver} implementations are expected to provide a constructor with the signature + * <p> + * <pre> + * <code>public SpanReceiverImpl(HTraceConfiguration)</code> + * </pre> + * The helper class {@link org.apache.htrace.SpanReceiverBuilder} provides convenient factory + * methods for creating {@code SpanReceiver} instances from configuration. + * @see org.apache.htrace.SpanReceiverBuilder + */ +public interface SpanReceiver extends Closeable { + /** + * Called when a Span is stopped and can now be stored. + */ + public void receiveSpan(Span span); +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fd889b65/htrace-core/src/main/java/org/apache/htrace/core/SpanReceiverBuilder.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/core/SpanReceiverBuilder.java b/htrace-core/src/main/java/org/apache/htrace/core/SpanReceiverBuilder.java new file mode 100644 index 0000000..3ab0b07 --- /dev/null +++ b/htrace-core/src/main/java/org/apache/htrace/core/SpanReceiverBuilder.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import java.lang.reflect.Constructor; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A {@link SpanReceiver} builder. It reads a {@link SpanReceiver} class name from the provided + * configuration using the {@link #SPAN_RECEIVER_CONF_KEY} key. Unqualified class names + * are interpreted as members of the {@code org.apache.htrace.impl} package. The {@link #build()} + * method constructs an instance of that class, initialized with the same configuration. + */ +public class SpanReceiverBuilder { + private static final Log LOG = LogFactory.getLog(SpanReceiverBuilder.class); + + public final static String SPAN_RECEIVER_CONF_KEY = "span.receiver"; + private final static String DEFAULT_PACKAGE = "org.apache.htrace.core"; + private final static ClassLoader classLoader = + SpanReceiverBuilder.class.getClassLoader(); + private final HTraceConfiguration conf; + private boolean logErrors; + private String spanReceiverClass; + + public SpanReceiverBuilder(HTraceConfiguration conf) { + this.conf = conf; + reset(); + } + + /** + * Set this builder back to defaults. Any previous calls to {@link #spanReceiverClass(String)} + * are overridden by the value provided by configuration. + * @return This instance + */ + public SpanReceiverBuilder reset() { + this.logErrors = true; + this.spanReceiverClass = this.conf.get(SPAN_RECEIVER_CONF_KEY); + return this; + } + + /** + * Override the {@code SpanReceiver} class name provided in configuration with a new value. + * @return This instance + */ + public SpanReceiverBuilder spanReceiverClass(final String spanReceiverClass) { + this.spanReceiverClass = spanReceiverClass; + return this; + } + + /** + * Configure whether we should log errors during build(). + * @return This instance + */ + public SpanReceiverBuilder logErrors(boolean logErrors) { + this.logErrors = logErrors; + return this; + } + + private void logError(String errorStr) { + if (!logErrors) { + return; + } + LOG.error(errorStr); + } + + private void logError(String errorStr, Throwable e) { + if (!logErrors) { + return; + } + LOG.error(errorStr, e); + } + + public SpanReceiver build() { + SpanReceiver spanReceiver = newSpanReceiver(); + if (LOG.isTraceEnabled()) { + LOG.trace("Created new span receiver of type " + + ((spanReceiver == null) ? "(none)" : + spanReceiver.getClass().getName())); + } + return spanReceiver; + } + + private SpanReceiver newSpanReceiver() { + if ((this.spanReceiverClass == null) || + this.spanReceiverClass.isEmpty()) { + LOG.debug("No span receiver class specified."); + return null; + } + String str = spanReceiverClass; + if (!str.contains(".")) { + str = DEFAULT_PACKAGE + "." + str; + } + Class cls = null; + try { + cls = classLoader.loadClass(str); + } catch (ClassNotFoundException e) { + logError("SpanReceiverBuilder cannot find SpanReceiver class " + str + + ": disabling span receiver."); + return null; + } + Constructor<SpanReceiver> ctor = null; + try { + ctor = cls.getConstructor(HTraceConfiguration.class); + } catch (NoSuchMethodException e) { + logError("SpanReceiverBuilder cannot find a constructor for class " + + str + "which takes an HTraceConfiguration. Disabling span " + + "receiver."); + return null; + } + try { + LOG.debug("Creating new instance of " + str + "..."); + return ctor.newInstance(conf); + } catch (ReflectiveOperationException e) { + logError("SpanReceiverBuilder reflection error when constructing " + str + + ". Disabling span receiver.", e); + return null; + } catch (Throwable e) { + logError("SpanReceiverBuilder constructor error when constructing " + str + + ". Disabling span receiver.", e); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fd889b65/htrace-core/src/main/java/org/apache/htrace/core/StandardOutSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/core/StandardOutSpanReceiver.java b/htrace-core/src/main/java/org/apache/htrace/core/StandardOutSpanReceiver.java new file mode 100644 index 0000000..b084046 --- /dev/null +++ b/htrace-core/src/main/java/org/apache/htrace/core/StandardOutSpanReceiver.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; + +/** + * Used for testing. Simply prints to standard out any spans it receives. + */ +public class StandardOutSpanReceiver implements SpanReceiver { + private static final Log LOG = LogFactory.getLog(StandardOutSpanReceiver.class); + + public StandardOutSpanReceiver(HTraceConfiguration conf) { + LOG.trace("Created new StandardOutSpanReceiver."); + } + + @Override + public void receiveSpan(Span span) { + System.out.println(span); + } + + @Override + public void close() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fd889b65/htrace-core/src/main/java/org/apache/htrace/core/TimelineAnnotation.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/core/TimelineAnnotation.java b/htrace-core/src/main/java/org/apache/htrace/core/TimelineAnnotation.java new file mode 100644 index 0000000..18de061 --- /dev/null +++ b/htrace-core/src/main/java/org/apache/htrace/core/TimelineAnnotation.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +public class TimelineAnnotation { + private final long time; + private final String msg; + + public TimelineAnnotation(long time, String msg) { + this.time = time; + this.msg = msg; + } + + public long getTime() { + return time; + } + + public String getMessage() { + return msg; + } + + @Override + public String toString() { + return "@" + time + ": " + msg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fd889b65/htrace-core/src/main/java/org/apache/htrace/core/Trace.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/core/Trace.java b/htrace-core/src/main/java/org/apache/htrace/core/Trace.java new file mode 100644 index 0000000..9b72afe --- /dev/null +++ b/htrace-core/src/main/java/org/apache/htrace/core/Trace.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.concurrent.Callable; + +/** + * The Trace class is the primary way to interact with the library. It provides + * methods to create and manipulate spans. + * + * A 'Span' represents a length of time. It has many other attributes such as a + * description, ID, and even potentially a set of key/value strings attached to + * it. + * + * Each thread in your application has a single currently active currentSpan + * associated with it. When this is non-null, it represents the current + * operation that the thread is doing. Spans are NOT thread-safe, and must + * never be used by multiple threads at once. With care, it is possible to + * safely pass a Span object between threads, but in most cases this is not + * necessary. + * + * A 'TraceScope' can either be empty, or contain a Span. TraceScope objects + * implement the Java's Closeable interface. Similar to file descriptors, they + * must be closed after they are created. When a TraceScope contains a Span, + * this span is closed when the scope is closed. + * + * The 'startSpan' methods in this class do a few things: + * <ul> + * <li>Create a new Span which has this thread's currentSpan as one of its parents.</li> + * <li>Set currentSpan to the new Span.</li> + * <li>Create a TraceSpan object to manage the new Span.</li> + * </ul> + * + * Closing a TraceScope does a few things: + * <ul> + * <li>It closes the span which the scope was managing.</li> + * <li>Set currentSpan to the previous currentSpan (which may be null).</li> + * </ul> + */ +public class Trace { + private static final Log LOG = LogFactory.getLog(Trace.class); + + /** + * Creates a new trace scope. + * + * If this thread has a currently active trace span, the trace scope we create + * here will contain a new span descending from the currently active span. + * If there is no currently active trace span, the trace scope we create will + * be empty. + * + * @param description The description field for the new span to create. + */ + public static TraceScope startSpan(String description) { + return startSpan(description, NeverSampler.INSTANCE); + } + + public static TraceScope startSpan(String description, SpanId parentId) { + if (parentId == null) { + return continueSpan(null); + } + Span newSpan = new MilliSpan.Builder(). + begin(System.currentTimeMillis()). + end(0). + description(description). + spanId(parentId.newChildId()). + parents(new SpanId[] { parentId }). + build(); + return continueSpan(newSpan); + } + + /** + * Creates a new trace scope. + * + * If this thread has a currently active trace span, it must be the 'parent' + * span that you pass in here as a parameter. The trace scope we create here + * will contain a new span which is a child of 'parent'. + * + * @param description The description field for the new span to create. + */ + public static TraceScope startSpan(String description, Span parent) { + if (parent == null) { + return startSpan(description); + } + Span currentSpan = currentSpan(); + if ((currentSpan != null) && (currentSpan != parent)) { + Tracer.clientError("HTrace client error: thread " + + Thread.currentThread().getName() + " tried to start a new Span " + + "with parent " + parent.toString() + ", but there is already a " + + "currentSpan " + currentSpan); + } + return continueSpan(parent.child(description)); + } + + public static <T> TraceScope startSpan(String description, Sampler s) { + Span span = null; + if (isTracing() || s.next()) { + span = Tracer.getInstance().createNew(description); + } + return continueSpan(span); + } + + /** + * Pick up an existing span from another thread. + */ + public static TraceScope continueSpan(Span s) { + // Return an empty TraceScope that does nothing on close + if (s == null) return NullScope.INSTANCE; + return Tracer.getInstance().continueSpan(s); + } + + /** + * Removes the given SpanReceiver from the list of SpanReceivers. + */ + public static void removeReceiver(SpanReceiver rcvr) { + Tracer.getInstance().removeReceiver(rcvr); + } + + /** + * Adds the given SpanReceiver to the current Tracer instance's list of + * SpanReceivers. + */ + public static void addReceiver(SpanReceiver rcvr) { + Tracer.getInstance().addReceiver(rcvr); + } + + /** + * Adds a data annotation to the current span if tracing is currently on. + */ + public static void addKVAnnotation(String key, String value) { + Span s = currentSpan(); + if (s != null) { + s.addKVAnnotation(key, value); + } + } + + /** + * Annotate the current span with the given message. + */ + public static void addTimelineAnnotation(String msg) { + Span s = currentSpan(); + if (s != null) { + s.addTimelineAnnotation(msg); + } + } + + /** + * Returns true if the current thread is a part of a trace, false otherwise. + */ + public static boolean isTracing() { + return Tracer.getInstance().isTracing(); + } + + /** + * If we are tracing, return the current span, else null + * + * @return Span representing the current trace, or null if not tracing. + */ + public static Span currentSpan() { + return Tracer.getInstance().currentSpan(); + } + + /** + * Wrap the callable in a TraceCallable, if tracing. + * + * @return The callable provided, wrapped if tracing, 'callable' if not. + */ + public static <V> Callable<V> wrap(Callable<V> callable) { + if (isTracing()) { + return new TraceCallable<V>(Trace.currentSpan(), callable); + } else { + return callable; + } + } + + /** + * Wrap the runnable in a TraceRunnable, if tracing + * + * @return The runnable provided, wrapped if tracing, 'runnable' if not. + */ + public static Runnable wrap(Runnable runnable) { + if (isTracing()) { + return new TraceRunnable(Trace.currentSpan(), runnable); + } else { + return runnable; + } + } + + /** + * Wrap the runnable in a TraceRunnable, if tracing + * + * @param description name of the span to be created. + * @param runnable The runnable that will have tracing info associated with it if tracing. + * @return The runnable provided, wrapped if tracing, 'runnable' if not. + */ + public static Runnable wrap(String description, Runnable runnable) { + if (isTracing()) { + return new TraceRunnable(Trace.currentSpan(), runnable, description); + } else { + return runnable; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fd889b65/htrace-core/src/main/java/org/apache/htrace/core/TraceCallable.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/core/TraceCallable.java b/htrace-core/src/main/java/org/apache/htrace/core/TraceCallable.java new file mode 100644 index 0000000..08bcace --- /dev/null +++ b/htrace-core/src/main/java/org/apache/htrace/core/TraceCallable.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import java.util.concurrent.Callable; + +/** + * Wrap a Callable with a Span that survives a change in threads. + */ +public class TraceCallable<V> implements Callable<V> { + private final Callable<V> impl; + private final Span parent; + private final String description; + + public TraceCallable(Callable<V> impl) { + this(Trace.currentSpan(), impl); + } + + public TraceCallable(Span parent, Callable<V> impl) { + this(parent, impl, null); + } + + public TraceCallable(Span parent, Callable<V> impl, String description) { + this.impl = impl; + this.parent = parent; + this.description = description; + } + + @Override + public V call() throws Exception { + if (parent != null) { + TraceScope chunk = Trace.startSpan(getDescription(), parent); + + try { + return impl.call(); + } finally { + chunk.close(); + } + } else { + return impl.call(); + } + } + + public Callable<V> getImpl() { + return impl; + } + + private String getDescription() { + return this.description == null ? Thread.currentThread().getName() : description; + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fd889b65/htrace-core/src/main/java/org/apache/htrace/core/TraceExecutorService.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/core/TraceExecutorService.java b/htrace-core/src/main/java/org/apache/htrace/core/TraceExecutorService.java new file mode 100644 index 0000000..8519d04 --- /dev/null +++ b/htrace-core/src/main/java/org/apache/htrace/core/TraceExecutorService.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +public class TraceExecutorService implements ExecutorService { + + private final ExecutorService impl; + + public TraceExecutorService(ExecutorService impl) { + this.impl = impl; + } + + @Override + public void execute(Runnable command) { + impl.execute(new TraceRunnable(command)); + } + + @Override + public void shutdown() { + impl.shutdown(); + } + + @Override + public List<Runnable> shutdownNow() { + return impl.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return impl.isShutdown(); + } + + @Override + public boolean isTerminated() { + return impl.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { + return impl.awaitTermination(timeout, unit); + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + return impl.submit(new TraceCallable<T>(task)); + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + return impl.submit(new TraceRunnable(task), result); + } + + @Override + public Future<?> submit(Runnable task) { + return impl.submit(new TraceRunnable(task)); + } + + private <T> Collection<? extends Callable<T>> wrapCollection( + Collection<? extends Callable<T>> tasks) { + List<Callable<T>> result = new ArrayList<Callable<T>>(); + for (Callable<T> task : tasks) { + result.add(new TraceCallable<T>(task)); + } + return result; + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) + throws InterruptedException { + return impl.invokeAll(wrapCollection(tasks)); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, + long timeout, TimeUnit unit) throws InterruptedException { + return impl.invokeAll(wrapCollection(tasks), timeout, unit); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + return impl.invokeAny(wrapCollection(tasks)); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, + TimeUnit unit) throws InterruptedException, ExecutionException, + TimeoutException { + return impl.invokeAny(wrapCollection(tasks), timeout, unit); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fd889b65/htrace-core/src/main/java/org/apache/htrace/core/TraceProxy.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/core/TraceProxy.java b/htrace-core/src/main/java/org/apache/htrace/core/TraceProxy.java new file mode 100644 index 0000000..de9c980 --- /dev/null +++ b/htrace-core/src/main/java/org/apache/htrace/core/TraceProxy.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +public class TraceProxy { + /** + * Returns an object that will trace all calls to itself. + */ + public static <T> T trace(T instance) { + return trace(instance, Sampler.ALWAYS); + } + + /** + * Returns an object that will trace all calls to itself. + */ + @SuppressWarnings("unchecked") + public static <T, V> T trace(final T instance, final Sampler sampler) { + InvocationHandler handler = new InvocationHandler() { + @Override + public Object invoke(Object obj, Method method, Object[] args) + throws Throwable { + if (!sampler.next()) { + return method.invoke(instance, args); + } + + TraceScope scope = Trace.startSpan(method.getName(), Sampler.ALWAYS); + try { + return method.invoke(instance, args); + } catch (Throwable ex) { + ex.printStackTrace(); + throw ex; + } finally { + scope.close(); + } + } + }; + return (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), + instance.getClass().getInterfaces(), handler); + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fd889b65/htrace-core/src/main/java/org/apache/htrace/core/TraceRunnable.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/core/TraceRunnable.java b/htrace-core/src/main/java/org/apache/htrace/core/TraceRunnable.java new file mode 100644 index 0000000..6accea9 --- /dev/null +++ b/htrace-core/src/main/java/org/apache/htrace/core/TraceRunnable.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +/** + * Wrap a Runnable with a Span that survives a change in threads. + */ +public class TraceRunnable implements Runnable { + + private final Span parent; + private final Runnable runnable; + private final String description; + + public TraceRunnable(Runnable runnable) { + this(Trace.currentSpan(), runnable); + } + + public TraceRunnable(Span parent, Runnable runnable) { + this(parent, runnable, null); + } + + public TraceRunnable(Span parent, Runnable runnable, String description) { + this.parent = parent; + this.runnable = runnable; + this.description = description; + } + + @Override + public void run() { + if (parent != null) { + TraceScope chunk = Trace.startSpan(getDescription(), parent); + + try { + runnable.run(); + } finally { + chunk.close(); + } + } else { + runnable.run(); + } + } + + private String getDescription() { + return this.description == null ? Thread.currentThread().getName() : description; + } + + public Runnable getRunnable() { + return runnable; + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fd889b65/htrace-core/src/main/java/org/apache/htrace/core/TraceScope.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/core/TraceScope.java b/htrace-core/src/main/java/org/apache/htrace/core/TraceScope.java new file mode 100644 index 0000000..f41e720 --- /dev/null +++ b/htrace-core/src/main/java/org/apache/htrace/core/TraceScope.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import java.io.Closeable; +import java.lang.Thread; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class TraceScope implements Closeable { + private static final Log LOG = LogFactory.getLog(TraceScope.class); + + /** + * the span for this scope + */ + private final Span span; + + /** + * the span that was "current" before this scope was entered + */ + private final Span savedSpan; + + private boolean detached = false; + + TraceScope(Span span, Span saved) { + this.span = span; + this.savedSpan = saved; + } + + public Span getSpan() { + return span; + } + + /** + * Remove this span as the current thread, but don't stop it yet or + * send it for collection. This is useful if the span object is then + * passed to another thread for use with Trace.continueTrace(). + * + * @return the same Span object + */ + public Span detach() { + if (detached) { + Tracer.clientError("Tried to detach trace span " + span + " but " + + "it has already been detached."); + } + detached = true; + + Span cur = Tracer.getInstance().currentSpan(); + if (cur != span) { + Tracer.clientError("Tried to detach trace span " + span + " but " + + "it is not the current span for the " + + Thread.currentThread().getName() + " thread. You have " + + "probably forgotten to close or detach " + cur); + } else { + Tracer.getInstance().setCurrentSpan(savedSpan); + } + return span; + } + + /** + * Return true when {@link #detach()} has been called. Helpful when debugging + * multiple threads working on a single span. + */ + public boolean isDetached() { + return detached; + } + + @Override + public void close() { + if (detached) { + return; + } + detached = true; + Span cur = Tracer.getInstance().currentSpan(); + if (cur != span) { + Tracer.clientError("Tried to close trace span " + span + " but " + + "it is not the current span for the " + + Thread.currentThread().getName() + " thread. You have " + + "probably forgotten to close or detach " + cur); + } else { + span.stop(); + Tracer.getInstance().setCurrentSpan(savedSpan); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fd889b65/htrace-core/src/main/java/org/apache/htrace/core/Tracer.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/core/Tracer.java b/htrace-core/src/main/java/org/apache/htrace/core/Tracer.java new file mode 100644 index 0000000..b2ef6e6 --- /dev/null +++ b/htrace-core/src/main/java/org/apache/htrace/core/Tracer.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ThreadLocalRandom; + +/** + * A Tracer provides the implementation for collecting and distributing Spans + * within a process. + */ +public class Tracer { + private static final Log LOG = LogFactory.getLog(Tracer.class); + + static long nonZeroRandom64() { + long id; + Random random = ThreadLocalRandom.current(); + do { + id = random.nextLong(); + } while (id == 0); + return id; + } + + private final List<SpanReceiver> receivers = new CopyOnWriteArrayList<SpanReceiver>(); + private static final ThreadLocal<Span> currentSpan = new ThreadLocal<Span>() { + @Override + protected Span initialValue() { + return null; + } + }; + private static final SpanId EMPTY_PARENT_ARRAY[] = new SpanId[0]; + + /** + * Log a client error, and throw an exception. + * + * @param str The message to use in the log and the exception. + */ + static void clientError(String str) { + LOG.error(str); + throw new RuntimeException(str); + } + + /** + * Internal class for defered singleton idiom. + * <p/> + * https://en.wikipedia.org/wiki/Initialization_on_demand_holder_idiom + */ + private static class TracerHolder { + private static final Tracer INSTANCE = new Tracer(); + } + + public static Tracer getInstance() { + return TracerHolder.INSTANCE; + } + + protected Span createNew(String description) { + Span parent = currentSpan.get(); + if (parent == null) { + return new MilliSpan.Builder(). + begin(System.currentTimeMillis()). + end(0). + description(description). + parents(EMPTY_PARENT_ARRAY). + spanId(SpanId.fromRandom()). + build(); + } else { + return parent.child(description); + } + } + + protected boolean isTracing() { + return currentSpan.get() != null; + } + + protected Span currentSpan() { + return currentSpan.get(); + } + + public void deliver(Span span) { + for (SpanReceiver receiver : receivers) { + receiver.receiveSpan(span); + } + } + + protected void addReceiver(SpanReceiver receiver) { + receivers.add(receiver); + } + + protected void removeReceiver(SpanReceiver receiver) { + receivers.remove(receiver); + } + + protected Span setCurrentSpan(Span span) { + if (LOG.isTraceEnabled()) { + LOG.trace("setting current span " + span); + } + currentSpan.set(span); + return span; + } + + public TraceScope continueSpan(Span s) { + Span oldCurrent = currentSpan(); + setCurrentSpan(s); + return new TraceScope(s, oldCurrent); + } + + protected int numReceivers() { + return receivers.size(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fd889b65/htrace-core/src/main/java/org/apache/htrace/core/TracerId.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/core/TracerId.java b/htrace-core/src/main/java/org/apache/htrace/core/TracerId.java new file mode 100644 index 0000000..7cdbd34 --- /dev/null +++ b/htrace-core/src/main/java/org/apache/htrace/core/TracerId.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.management.ManagementFactory; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; +import java.util.Locale; +import java.util.TreeSet; + +/** + * The HTrace tracer ID.<p/> + * + * HTrace tracer IDs are created from format strings. + * Format strings contain variables which the TracerId class will + * replace with the correct values at runtime.<p/> + * + * <ul> + * <li>${ip}: will be replaced with an ip address.</li> + * <li>${pname}: will be replaced the process name obtained from java.</li> + * </ul><p/> + * + * For example, the string "${pname}/${ip}" will be replaced with something + * like: DataNode/192.168.0.1, assuming that the process' name is DataNode + * and its IP address is 192.168.0.1.<p/> + * + * Process ID strings can contain backslashes as escapes. + * For example, "\a" will map to "a". "\${ip}" will map to the literal + * string "${ip}", not the IP address. A backslash itself can be escaped by a + * preceding backslash. + */ +public final class TracerId { + private static final Log LOG = LogFactory.getLog(TracerId.class); + + /** + * The configuration key to use for process id + */ + public static final String TRACER_ID_KEY = "process.id"; + + /** + * The default process ID to use if no other ID is configured. + */ + private static final String DEFAULT_TRACER_ID = "${pname}/${ip}"; + + private final String tracerId; + + TracerId(String fmt) { + StringBuilder bld = new StringBuilder(); + StringBuilder varBld = null; + boolean escaping = false; + int varSeen = 0; + for (int i = 0, len = fmt.length() ; i < len; i++) { + char c = fmt.charAt(i); + if (c == '\\') { + if (!escaping) { + escaping = true; + continue; + } + } + switch (varSeen) { + case 0: + if (c == '$') { + if (!escaping) { + varSeen = 1; + continue; + } + } + escaping = false; + varSeen = 0; + bld.append(c); + break; + case 1: + if (c == '{') { + if (!escaping) { + varSeen = 2; + varBld = new StringBuilder(); + continue; + } + } + escaping = false; + varSeen = 0; + bld.append("$").append(c); + break; + default: + if (c == '}') { + if (!escaping) { + String var = varBld.toString(); + bld.append(processShellVar(var)); + varBld = null; + varSeen = 0; + continue; + } + } + escaping = false; + varBld.append(c); + varSeen++; + break; + } + } + if (varSeen > 0) { + LOG.warn("Unterminated process ID substitution variable at the end " + + "of format string " + fmt); + } + this.tracerId = bld.toString(); + if (LOG.isTraceEnabled()) { + LOG.trace("ProcessID(fmt=" + fmt + "): computed process ID of \"" + + this.tracerId + "\""); + } + } + + public TracerId(HTraceConfiguration conf) { + this(conf.get(TRACER_ID_KEY, DEFAULT_TRACER_ID)); + } + + private String processShellVar(String var) { + if (var.equals("pname")) { + return getProcessName(); + } else if (var.equals("ip")) { + return getBestIpString(); + } else if (var.equals("pid")) { + return Long.valueOf(getOsPid()).toString(); + } else { + LOG.warn("unknown ProcessID variable " + var); + return ""; + } + } + + static String getProcessName() { + String cmdLine = System.getProperty("sun.java.command"); + if (cmdLine != null && !cmdLine.isEmpty()) { + String fullClassName = cmdLine.split("\\s+")[0]; + String[] classParts = fullClassName.split("\\."); + cmdLine = classParts[classParts.length - 1]; + } + return (cmdLine == null || cmdLine.isEmpty()) ? "Unknown" : cmdLine; + } + + /** + * Get the best IP address that represents this node.<p/> + * + * This is complicated since nodes can have multiple network interfaces, + * and each network interface can have multiple IP addresses. What we're + * looking for here is an IP address that will serve to identify this node + * to HTrace. So we prefer site-local addresess (i.e. private ones on the + * LAN) to publicly routable interfaces. If there are multiple addresses + * to choose from, we select the one which comes first in textual sort + * order. This should ensure that we at least consistently call each node + * by a single name. + */ + static String getBestIpString() { + Enumeration<NetworkInterface> ifaces; + try { + ifaces = NetworkInterface.getNetworkInterfaces(); + } catch (SocketException e) { + LOG.error("Error getting network interfaces", e); + return "127.0.0.1"; + } + TreeSet<String> siteLocalCandidates = new TreeSet<String>(); + TreeSet<String> candidates = new TreeSet<String>(); + while (ifaces.hasMoreElements()) { + NetworkInterface iface = ifaces.nextElement(); + for (Enumeration<InetAddress> addrs = + iface.getInetAddresses(); addrs.hasMoreElements();) { + InetAddress addr = addrs.nextElement(); + if (!addr.isLoopbackAddress()) { + if (addr.isSiteLocalAddress()) { + siteLocalCandidates.add(addr.getHostAddress()); + } else { + candidates.add(addr.getHostAddress()); + } + } + } + } + if (!siteLocalCandidates.isEmpty()) { + return siteLocalCandidates.first(); + } + if (!candidates.isEmpty()) { + return candidates.first(); + } + return "127.0.0.1"; + } + + /** + * Get the process id from the operating system.<p/> + * + * Unfortunately, there is no simple method to get the process id in Java. + * The approach we take here is to use the shell method (see + * {TracerId#getOsPidFromShellPpid}) unless we are on Windows, where the + * shell is not available. On Windows, we use + * {TracerId#getOsPidFromManagementFactory}, which depends on some + * undocumented features of the JVM, but which doesn't require a shell. + */ + static long getOsPid() { + if ((System.getProperty("os.name", "generic").toLowerCase(Locale.ENGLISH)). + contains("windows")) { + return getOsPidFromManagementFactory(); + } else { + return getOsPidFromShellPpid(); + } + } + + /** + * Get the process ID by executing a shell and printing the PPID (parent + * process ID).<p/> + * + * This method of getting the process ID doesn't depend on any undocumented + * features of the virtual machine, and should work on almost any UNIX + * operating system. + */ + private static long getOsPidFromShellPpid() { + Process p = null; + StringBuilder sb = new StringBuilder(); + try { + p = new ProcessBuilder("/usr/bin/env", "sh", "-c", "echo $PPID"). + redirectErrorStream(true).start(); + BufferedReader reader = new BufferedReader( + new InputStreamReader(p.getInputStream())); + String line = ""; + while ((line = reader.readLine()) != null) { + sb.append(line.trim()); + } + int exitVal = p.waitFor(); + if (exitVal != 0) { + throw new IOException("Process exited with error code " + + Integer.valueOf(exitVal).toString()); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while getting operating system pid from " + + "the shell.", e); + return 0L; + } catch (IOException e) { + LOG.error("Error getting operating system pid from the shell.", e); + return 0L; + } finally { + if (p != null) { + p.destroy(); + } + } + try { + return Long.parseLong(sb.toString()); + } catch (NumberFormatException e) { + LOG.error("Error parsing operating system pid from the shell.", e); + return 0L; + } + } + + /** + * Get the process ID by looking at the name of the managed bean for the + * runtime system of the Java virtual machine.<p/> + * + * Although this is undocumented, in the Oracle JVM this name is of the form + * [OS_PROCESS_ID]@[HOSTNAME]. + */ + private static long getOsPidFromManagementFactory() { + try { + return Long.parseLong(ManagementFactory.getRuntimeMXBean(). + getName().split("@")[0]); + } catch (NumberFormatException e) { + LOG.error("Failed to get the operating system process ID from the name " + + "of the managed bean for the JVM.", e); + return 0L; + } + } + + public String get() { + return tracerId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fd889b65/htrace-core/src/main/java/org/apache/htrace/impl/AlwaysSampler.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/AlwaysSampler.java b/htrace-core/src/main/java/org/apache/htrace/impl/AlwaysSampler.java deleted file mode 100644 index 699f970..0000000 --- a/htrace-core/src/main/java/org/apache/htrace/impl/AlwaysSampler.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.htrace.impl; - -import org.apache.htrace.HTraceConfiguration; -import org.apache.htrace.Sampler; - -/** - * A Sampler that always returns true. - */ -public final class AlwaysSampler implements Sampler { - - public static final AlwaysSampler INSTANCE = new AlwaysSampler(null); - - public AlwaysSampler(HTraceConfiguration conf) { - } - - @Override - public boolean next() { - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fd889b65/htrace-core/src/main/java/org/apache/htrace/impl/CountSampler.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/CountSampler.java b/htrace-core/src/main/java/org/apache/htrace/impl/CountSampler.java deleted file mode 100644 index e59a4ba..0000000 --- a/htrace-core/src/main/java/org/apache/htrace/impl/CountSampler.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.htrace.impl; - -import org.apache.htrace.HTraceConfiguration; -import org.apache.htrace.Sampler; - -import java.util.Random; -import java.util.concurrent.ThreadLocalRandom; - -/** - * Sampler that returns true every N calls. Specify the frequency interval by configuring a - * {@code long} value for {@link #SAMPLER_FREQUENCY_CONF_KEY}. - */ -public class CountSampler implements Sampler { - public final static String SAMPLER_FREQUENCY_CONF_KEY = "sampler.frequency"; - - final long frequency; - long count = ThreadLocalRandom.current().nextLong(); - - public CountSampler(HTraceConfiguration conf) { - this.frequency = Long.parseLong(conf.get(SAMPLER_FREQUENCY_CONF_KEY), 10); - } - - @Override - public boolean next() { - return (count++ % frequency) == 0; - } -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fd889b65/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java b/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java deleted file mode 100644 index dfb701d..0000000 --- a/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.htrace.impl; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.htrace.HTraceConfiguration; -import org.apache.htrace.Span; -import org.apache.htrace.SpanReceiver; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.EOFException; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.FileWriter; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.FileSystems; -import java.nio.file.StandardOpenOption; -import java.util.UUID; -import java.util.concurrent.locks.ReentrantLock; - -/** - * Writes the spans it receives to a local file. - */ -public class LocalFileSpanReceiver implements SpanReceiver { - private static final Log LOG = LogFactory.getLog(LocalFileSpanReceiver.class); - public static final String PATH_KEY = "local-file-span-receiver.path"; - public static final String CAPACITY_KEY = "local-file-span-receiver.capacity"; - public static final int CAPACITY_DEFAULT = 5000; - private static ObjectWriter JSON_WRITER = new ObjectMapper().writer(); - private final String path; - - private byte[][] bufferedSpans; - private int bufferedSpansIndex; - private final ReentrantLock bufferLock = new ReentrantLock(); - - private final FileOutputStream stream; - private final FileChannel channel; - private final ReentrantLock channelLock = new ReentrantLock(); - private final TracerId tracerId; - - public LocalFileSpanReceiver(HTraceConfiguration conf) { - int capacity = conf.getInt(CAPACITY_KEY, CAPACITY_DEFAULT); - if (capacity < 1) { - throw new IllegalArgumentException(CAPACITY_KEY + " must not be " + - "less than 1."); - } - this.path = conf.get(PATH_KEY); - if (path == null || path.isEmpty()) { - throw new IllegalArgumentException("must configure " + PATH_KEY); - } - boolean success = false; - try { - this.stream = new FileOutputStream(path, true); - } catch (IOException ioe) { - LOG.error("Error opening " + path + ": " + ioe.getMessage()); - throw new RuntimeException(ioe); - } - this.channel = stream.getChannel(); - if (this.channel == null) { - try { - this.stream.close(); - } catch (IOException e) { - LOG.error("Error closing " + path, e); - } - LOG.error("Failed to get channel for " + path); - throw new RuntimeException("Failed to get channel for " + path); - } - this.bufferedSpans = new byte[capacity][]; - this.bufferedSpansIndex = 0; - if (LOG.isDebugEnabled()) { - LOG.debug("Created new LocalFileSpanReceiver with path = " + path + - ", capacity = " + capacity); - } - this.tracerId = new TracerId(conf); - } - - /** - * Number of buffers to use in FileChannel#write. - * - * On UNIX, FileChannel#write uses writev-- a kernel interface that allows - * us to send multiple buffers at once. This is more efficient than making a - * separate write call for each buffer, since it minimizes the number of - * transitions from userspace to kernel space. - */ - private final int WRITEV_SIZE = 20; - - private final static ByteBuffer newlineBuf = - ByteBuffer.wrap(new byte[] { (byte)0xa }); - - /** - * Flushes a bufferedSpans array. - */ - private void doFlush(byte[][] toFlush, int len) throws IOException { - int bidx = 0, widx = 0; - ByteBuffer writevBufs[] = new ByteBuffer[2 * WRITEV_SIZE]; - - while (true) { - if (widx == writevBufs.length) { - channel.write(writevBufs); - widx = 0; - } - if (bidx == len) { - break; - } - writevBufs[widx] = ByteBuffer.wrap(toFlush[bidx]); - writevBufs[widx + 1] = newlineBuf; - bidx++; - widx+=2; - } - if (widx > 0) { - channel.write(writevBufs, 0, widx); - } - } - - @Override - public void receiveSpan(Span span) { - if (span.getTracerId().isEmpty()) { - span.setTracerId(tracerId.get()); - } - - // Serialize the span data into a byte[]. Note that we're not holding the - // lock here, to improve concurrency. - byte jsonBuf[] = null; - try { - jsonBuf = JSON_WRITER.writeValueAsBytes(span); - } catch (JsonProcessingException e) { - LOG.error("receiveSpan(path=" + path + ", span=" + span + "): " + - "Json processing error: " + e.getMessage()); - return; - } - - // Grab the bufferLock and put our jsonBuf into the list of buffers to - // flush. - byte toFlush[][] = null; - bufferLock.lock(); - try { - if (bufferedSpans == null) { - LOG.debug("receiveSpan(path=" + path + ", span=" + span + "): " + - "LocalFileSpanReceiver for " + path + " is closed."); - return; - } - bufferedSpans[bufferedSpansIndex] = jsonBuf; - bufferedSpansIndex++; - if (bufferedSpansIndex == bufferedSpans.length) { - // If we've hit the limit for the number of buffers to flush, - // swap out the existing bufferedSpans array for a new array, and - // prepare to flush those spans to disk. - toFlush = bufferedSpans; - bufferedSpansIndex = 0; - bufferedSpans = new byte[bufferedSpans.length][]; - } - } finally { - bufferLock.unlock(); - } - if (toFlush != null) { - // We released the bufferLock above, to avoid blocking concurrent - // receiveSpan calls. But now, we must take the channelLock, to make - // sure that we have sole access to the output channel. If we did not do - // this, we might get interleaved output. - // - // There is a small chance that another thread doing a flush of more - // recent spans could get ahead of us here, and take the lock before we - // do. This is ok, since spans don't have to be written out in order. - channelLock.lock(); - try { - doFlush(toFlush, toFlush.length); - } catch (IOException ioe) { - LOG.error("Error flushing buffers to " + path + ": " + - ioe.getMessage()); - } finally { - channelLock.unlock(); - } - } - } - - @Override - public void close() throws IOException { - byte toFlush[][] = null; - int numToFlush = 0; - bufferLock.lock(); - try { - if (bufferedSpans == null) { - LOG.info("LocalFileSpanReceiver for " + path + " was already closed."); - return; - } - numToFlush = bufferedSpansIndex; - bufferedSpansIndex = 0; - toFlush = bufferedSpans; - bufferedSpans = null; - } finally { - bufferLock.unlock(); - } - channelLock.lock(); - try { - doFlush(toFlush, numToFlush); - } catch (IOException ioe) { - LOG.error("Error flushing buffers to " + path + ": " + - ioe.getMessage()); - } finally { - try { - stream.close(); - } catch (IOException e) { - LOG.error("Error closing stream for " + path, e); - } - channelLock.unlock(); - } - } - - public static String getUniqueLocalTraceFileName() { - String tmp = System.getProperty("java.io.tmpdir", "/tmp"); - String nonce = null; - BufferedReader reader = null; - try { - // On Linux we can get a unique local file name by reading the process id - // out of /proc/self/stat. (There isn't any portable way to get the - // process ID from Java.) - reader = new BufferedReader( - new InputStreamReader(new FileInputStream("/proc/self/stat"), - "UTF-8")); - String line = reader.readLine(); - if (line == null) { - throw new EOFException(); - } - nonce = line.split(" ")[0]; - } catch (IOException e) { - } finally { - if (reader != null) { - try { - reader.close(); - } catch(IOException e) { - LOG.warn("Exception in closing " + reader, e); - } - } - } - if (nonce == null) { - // If we can't use the process ID, use a random nonce. - nonce = UUID.randomUUID().toString(); - } - return new File(tmp, nonce).getAbsolutePath(); - } -}
