http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core/src/test/java/org/apache/htrace/core/TraceCreator.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/test/java/org/apache/htrace/core/TraceCreator.java b/htrace-core/src/test/java/org/apache/htrace/core/TraceCreator.java deleted file mode 100644 index b843999..0000000 --- a/htrace-core/src/test/java/org/apache/htrace/core/TraceCreator.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.htrace.core; - -import java.util.Random; -import java.util.concurrent.ThreadLocalRandom; - -/** - * Does some stuff and traces it. - */ -public class TraceCreator { - public static final String RPC_TRACE_ROOT = "createSampleRpcTrace"; - public static final String THREADED_TRACE_ROOT = "createThreadedTrace"; - public static final String SIMPLE_TRACE_ROOT = "createSimpleTrace"; - - private final Tracer tracer; - - public TraceCreator(Tracer tracer) { - this.tracer = tracer; - } - - public void createSampleRpcTrace() { - TraceScope s = tracer.newScope(RPC_TRACE_ROOT); - try { - pretendRpcSend(); - } finally { - s.close(); - } - } - - public void createSimpleTrace() { - TraceScope s = tracer.newScope(SIMPLE_TRACE_ROOT); - try { - importantWork1(); - } finally { - s.close(); - } - } - - /** - * Creates the demo trace (will create different traces from call to call). - */ - public void createThreadedTrace() { - TraceScope s = tracer.newScope(THREADED_TRACE_ROOT); - try { - Random r = ThreadLocalRandom.current(); - int numThreads = r.nextInt(4) + 1; - Thread[] threads = new Thread[numThreads]; - - for (int i = 0; i < numThreads; i++) { - threads[i] = new Thread(tracer.wrap(new MyRunnable(), null)); - } - for (int i = 0; i < numThreads; i++) { - threads[i].start(); - } - for (int i = 0; i < numThreads; i++) { - try { - threads[i].join(); - } catch (InterruptedException e) { - } - } - importantWork1(); - } finally { - s.close(); - } - } - - private void importantWork1() { - TraceScope cur = tracer.newScope("important work 1"); - try { - Thread.sleep((long) (2000 * Math.random())); - importantWork2(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - cur.close(); - } - } - - private void importantWork2() { - TraceScope cur = tracer.newScope("important work 2"); - try { - Thread.sleep((long) (2000 * Math.random())); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - cur.close(); - } - } - - private class MyRunnable implements Runnable { - @Override - public void run() { - try { - Thread.sleep(750); - Random r = ThreadLocalRandom.current(); - int importantNumber = 100 / r.nextInt(3); - System.out.println("Important number: " + importantNumber); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } catch (ArithmeticException ae) { - TraceScope c = tracer.newScope("dealing with arithmetic exception."); - try { - Thread.sleep((long) (3000 * Math.random())); - } catch (InterruptedException ie1) { - Thread.currentThread().interrupt(); - } finally { - c.close(); - } - } - } - } - - public void pretendRpcSend() { - Span span = tracer.getCurrentSpan(); - pretendRpcReceiveWithTraceInfo(span.getSpanId()); - } - - public void pretendRpcReceiveWithTraceInfo(SpanId parentId) { - TraceScope s = tracer.newScope("received RPC", parentId); - try { - importantWork1(); - } finally { - s.close(); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core/src/test/java/org/apache/htrace/core/TraceGraph.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/test/java/org/apache/htrace/core/TraceGraph.java b/htrace-core/src/test/java/org/apache/htrace/core/TraceGraph.java deleted file mode 100644 index a06e620..0000000 --- a/htrace-core/src/test/java/org/apache/htrace/core/TraceGraph.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.htrace.core; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.TreeSet; - -/** - * Used to create the graph formed by spans. - */ -public class TraceGraph { - private static final Log LOG = LogFactory.getLog(Tracer.class); - - - public static class SpansByParent { - /** - * Compare two spans by span ID. - */ - private static Comparator<Span> COMPARATOR = - new Comparator<Span>() { - @Override - public int compare(Span a, Span b) { - return a.getSpanId().compareTo(b.getSpanId()); - } - }; - - private final TreeSet<Span> treeSet; - - private final HashMap<SpanId, LinkedList<Span>> parentToSpans; - - SpansByParent(Collection<Span> spans) { - TreeSet<Span> treeSet = new TreeSet<Span>(COMPARATOR); - parentToSpans = new HashMap<SpanId, LinkedList<Span>>(); - for (Span span : spans) { - treeSet.add(span); - for (SpanId parent : span.getParents()) { - LinkedList<Span> list = parentToSpans.get(parent); - if (list == null) { - list = new LinkedList<Span>(); - parentToSpans.put(parent, list); - } - list.add(span); - } - if (span.getParents().length == 0) { - LinkedList<Span> list = parentToSpans.get(SpanId.INVALID); - if (list == null) { - list = new LinkedList<Span>(); - parentToSpans.put(SpanId.INVALID, list); - } - list.add(span); - } - } - this.treeSet = treeSet; - } - - public List<Span> find(SpanId parentId) { - LinkedList<Span> spans = parentToSpans.get(parentId); - if (spans == null) { - return new LinkedList<Span>(); - } - return spans; - } - - public Iterator<Span> iterator() { - return Collections.unmodifiableSortedSet(treeSet).iterator(); - } - } - - public static class SpansByTracerId { - /** - * Compare two spans by process ID, and then by span ID. - */ - private static Comparator<Span> COMPARATOR = - new Comparator<Span>() { - @Override - public int compare(Span a, Span b) { - int cmp = a.getTracerId().compareTo(b.getTracerId()); - if (cmp != 0) { - return cmp; - } - return a.getSpanId().compareTo(b.getSpanId()); - } - }; - - private final TreeSet<Span> treeSet; - - SpansByTracerId(Collection<Span> spans) { - TreeSet<Span> treeSet = new TreeSet<Span>(COMPARATOR); - for (Span span : spans) { - treeSet.add(span); - } - this.treeSet = treeSet; - } - - public List<Span> find(String tracerId) { - List<Span> spans = new ArrayList<Span>(); - Span span = new MilliSpan.Builder(). - spanId(SpanId.INVALID). - tracerId(tracerId). - build(); - while (true) { - span = treeSet.higher(span); - if (span == null) { - break; - } - if (span.getTracerId().equals(tracerId)) { - break; - } - spans.add(span); - } - return spans; - } - - public Iterator<Span> iterator() { - return Collections.unmodifiableSortedSet(treeSet).iterator(); - } - } - - private final SpansByParent spansByParent; - private final SpansByTracerId spansByTracerId; - - /** - * Create a new TraceGraph - * - * @param spans The collection of spans to use to create this TraceGraph. Should - * have at least one root span. - */ - public TraceGraph(Collection<Span> spans) { - this.spansByParent = new SpansByParent(spans); - this.spansByTracerId = new SpansByTracerId(spans); - } - - public SpansByParent getSpansByParent() { - return spansByParent; - } - - public SpansByTracerId getSpansByTracerId() { - return spansByTracerId; - } - - @Override - public String toString() { - StringBuilder bld = new StringBuilder(); - String prefix = ""; - for (Iterator<Span> iter = spansByParent.iterator(); iter.hasNext();) { - Span span = iter.next(); - bld.append(prefix).append(span.toString()); - prefix = "\n"; - } - return bld.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core/src/test/java/org/apache/htrace/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/test/java/org/apache/htrace/util/TestUtil.java b/htrace-core/src/test/java/org/apache/htrace/util/TestUtil.java deleted file mode 100644 index 7cb4aed..0000000 --- a/htrace-core/src/test/java/org/apache/htrace/util/TestUtil.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.htrace.util; - -import java.io.File; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * Utilities for writing unit tests. - */ -public class TestUtil { - /** - * Get a dump of the stack traces of all threads. - */ - public static String threadDump() { - StringBuilder dump = new StringBuilder(); - Map<Thread, StackTraceElement[]> stackTraces = Thread.getAllStackTraces(); - for (Map.Entry<Thread, StackTraceElement[]> e : stackTraces.entrySet()) { - Thread thread = e.getKey(); - dump.append(String.format( - "\"%s\" %s prio=%d tid=%d %s\njava.lang.Thread.State: %s", - thread.getName(), - (thread.isDaemon() ? "daemon" : ""), - thread.getPriority(), - thread.getId(), - Thread.State.WAITING.equals(thread.getState()) ? - "in Object.wait()" : thread.getState().name().toLowerCase(), - Thread.State.WAITING.equals(thread.getState()) ? - "WAITING (on object monitor)" : thread.getState())); - for (StackTraceElement stackTraceElement : e.getValue()) { - dump.append("\n at "); - dump.append(stackTraceElement); - } - dump.append("\n"); - } - return dump.toString(); - } - - /** - * A callback which returns a value of type T. - * - * TODO: remove this when we're on Java 8, in favor of - * java.util.function.Supplier. - */ - public interface Supplier<T> { - T get(); - } - - /** - * Wait for a condition to become true for a configurable amount of time. - * - * @param check The condition to wait for. - * @param periodMs How often to check the condition, in milliseconds. - * @param timeoutMs How long to wait in total, in milliseconds. - */ - public static void waitFor(Supplier<Boolean> check, - long periodMs, long timeoutMs) - throws TimeoutException, InterruptedException - { - long endNs = System.nanoTime() + - TimeUnit.NANOSECONDS.convert(timeoutMs, TimeUnit.MILLISECONDS); - while (true) { - boolean result = check.get(); - if (result) { - return; - } - long nowNs = System.nanoTime(); - if (nowNs >= endNs) { - throw new TimeoutException("Timed out waiting for test condition. " + - "Thread dump:\n" + threadDump()); - } - Thread.sleep(periodMs); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core4/pom.xml ---------------------------------------------------------------------- diff --git a/htrace-core4/pom.xml b/htrace-core4/pom.xml new file mode 100644 index 0000000..efaaf65 --- /dev/null +++ b/htrace-core4/pom.xml @@ -0,0 +1,125 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor +license agreements. See the NOTICE file distributed with this work for additional +information regarding copyright ownership. The ASF licenses this file to +You under the Apache License, Version 2.0 (the "License"); you may not use +this file except in compliance with the License. You may obtain a copy of +the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required +by applicable law or agreed to in writing, software distributed under the +License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS +OF ANY KIND, either express or implied. See the License for the specific +language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>htrace-core4</artifactId> + <packaging>jar</packaging> + + <parent> + <artifactId>htrace</artifactId> + <groupId>org.apache.htrace</groupId> + <version>4.0.1-incubating</version> + <relativePath>..</relativePath> + </parent> + + <name>htrace-core4</name> + <url>http://incubator.apache.org/projects/htrace.html</url> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-javadoc-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <configuration> + <relocations> + <relocation> + <pattern>org.apache.commons.logging</pattern> + <shadedPattern>org.apache.htrace.commons.logging</shadedPattern> + </relocation> + <relocation> + <pattern>com.fasterxml.jackson</pattern> + <shadedPattern>org.apache.htrace.fasterxml.jackson</shadedPattern> + </relocation> + </relocations> + </configuration> + <goals> + <goal>shade</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-gpg-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + </plugin> + <plugin> + <!-- explicitly define maven-deploy-plugin after other to force exec order --> + <artifactId>maven-deploy-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <!-- Global deps. --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + <!-- core specific deps. --> + <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>dist</id> + <build> + <plugins> + <plugin> + <!--Make it so assembly:single does nothing in here--> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <skipAssembly>true</skipAssembly> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> +</project> http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core4/src/main/java/org/apache/htrace/core/AlwaysSampler.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/AlwaysSampler.java b/htrace-core4/src/main/java/org/apache/htrace/core/AlwaysSampler.java new file mode 100644 index 0000000..8d5a296 --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/AlwaysSampler.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +/** + * A Sampler that always returns true. + */ +public final class AlwaysSampler extends Sampler { + public static final AlwaysSampler INSTANCE = new AlwaysSampler(null); + + public AlwaysSampler(HTraceConfiguration conf) { + } + + @Override + public boolean next() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core4/src/main/java/org/apache/htrace/core/CountSampler.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/CountSampler.java b/htrace-core4/src/main/java/org/apache/htrace/core/CountSampler.java new file mode 100644 index 0000000..5a838c7 --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/CountSampler.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * Sampler that returns true every N calls. Specify the frequency interval by configuring a + * {@code long} value for {@link #SAMPLER_FREQUENCY_CONF_KEY}. + */ +public class CountSampler extends Sampler { + public final static String SAMPLER_FREQUENCY_CONF_KEY = "sampler.frequency"; + + final long frequency; + long count = ThreadLocalRandom.current().nextLong(); + + public CountSampler(HTraceConfiguration conf) { + this.frequency = Long.parseLong(conf.get(SAMPLER_FREQUENCY_CONF_KEY), 10); + } + + @Override + public boolean next() { + return (count++ % frequency) == 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core4/src/main/java/org/apache/htrace/core/HTraceConfiguration.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/HTraceConfiguration.java b/htrace-core4/src/main/java/org/apache/htrace/core/HTraceConfiguration.java new file mode 100644 index 0000000..c6e445b --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/HTraceConfiguration.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * Wrapper which integrating applications should implement in order + * to provide tracing configuration. + */ +public abstract class HTraceConfiguration { + + private static final Log LOG = LogFactory.getLog(HTraceConfiguration.class); + + private static final Map<String, String> EMPTY_MAP = new HashMap<String, String>(1); + + /** + * An empty HTrace configuration. + */ + public static final HTraceConfiguration EMPTY = fromMap(EMPTY_MAP); + + /** + * Create an HTrace configuration from a map. + * + * @param conf The map to create the configuration from. + * @return The new configuration. + */ + public static HTraceConfiguration fromMap(Map<String, String> conf) { + return new MapConf(conf); + } + + public static HTraceConfiguration fromKeyValuePairs(String... pairs) { + if ((pairs.length % 2) != 0) { + throw new RuntimeException("You must specify an equal number of keys " + + "and values."); + } + Map<String, String> conf = new HashMap<String, String>(); + for (int i = 0; i < pairs.length; i+=2) { + conf.put(pairs[i], pairs[i + 1]); + } + return new MapConf(conf); + } + + public abstract String get(String key); + + public abstract String get(String key, String defaultValue); + + public boolean getBoolean(String key, boolean defaultValue) { + String value = get(key, String.valueOf(defaultValue)).trim().toLowerCase(); + + if ("true".equals(value)) { + return true; + } else if ("false".equals(value)) { + return false; + } + + LOG.warn("Expected boolean for key [" + key + "] instead got [" + value + "]."); + return defaultValue; + } + + public int getInt(String key, int defaultVal) { + String val = get(key); + if (val == null || val.trim().isEmpty()) { + return defaultVal; + } + try { + return Integer.parseInt(val); + } catch (NumberFormatException nfe) { + throw new IllegalArgumentException("Bad value for '" + key + "': should be int"); + } + } + + private static class MapConf extends HTraceConfiguration { + private final Map<String, String> conf; + + public MapConf(Map<String, String> conf) { + this.conf = new HashMap<String, String>(conf); + } + + @Override + public String get(String key) { + return conf.get(key); + } + + @Override + public String get(String key, String defaultValue) { + String value = get(key); + return value == null ? defaultValue : value; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core4/src/main/java/org/apache/htrace/core/LocalFileSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/LocalFileSpanReceiver.java b/htrace-core4/src/main/java/org/apache/htrace/core/LocalFileSpanReceiver.java new file mode 100644 index 0000000..69a43b1 --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/LocalFileSpanReceiver.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.FileSystems; +import java.nio.file.StandardOpenOption; +import java.util.UUID; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Writes the spans it receives to a local file. + */ +public class LocalFileSpanReceiver extends SpanReceiver { + private static final Log LOG = LogFactory.getLog(LocalFileSpanReceiver.class); + public static final String PATH_KEY = "local.file.span.receiver.path"; + public static final String CAPACITY_KEY = "local.file.span.receiver.capacity"; + public static final int CAPACITY_DEFAULT = 5000; + private static ObjectWriter JSON_WRITER = new ObjectMapper().writer(); + private final String path; + + private byte[][] bufferedSpans; + private int bufferedSpansIndex; + private final ReentrantLock bufferLock = new ReentrantLock(); + + private final FileOutputStream stream; + private final FileChannel channel; + private final ReentrantLock channelLock = new ReentrantLock(); + + public LocalFileSpanReceiver(HTraceConfiguration conf) { + int capacity = conf.getInt(CAPACITY_KEY, CAPACITY_DEFAULT); + if (capacity < 1) { + throw new IllegalArgumentException(CAPACITY_KEY + " must not be " + + "less than 1."); + } + String pathStr = conf.get(PATH_KEY); + if (pathStr == null || pathStr.isEmpty()) { + path = getUniqueLocalTraceFileName(); + } else { + path = pathStr; + } + boolean success = false; + try { + this.stream = new FileOutputStream(path, true); + } catch (IOException ioe) { + LOG.error("Error opening " + path + ": " + ioe.getMessage()); + throw new RuntimeException(ioe); + } + this.channel = stream.getChannel(); + if (this.channel == null) { + try { + this.stream.close(); + } catch (IOException e) { + LOG.error("Error closing " + path, e); + } + LOG.error("Failed to get channel for " + path); + throw new RuntimeException("Failed to get channel for " + path); + } + this.bufferedSpans = new byte[capacity][]; + this.bufferedSpansIndex = 0; + if (LOG.isDebugEnabled()) { + LOG.debug("Created new LocalFileSpanReceiver with path = " + path + + ", capacity = " + capacity); + } + } + + /** + * Number of buffers to use in FileChannel#write. + * + * On UNIX, FileChannel#write uses writev-- a kernel interface that allows + * us to send multiple buffers at once. This is more efficient than making a + * separate write call for each buffer, since it minimizes the number of + * transitions from userspace to kernel space. + */ + private final int WRITEV_SIZE = 20; + + private final static ByteBuffer newlineBuf = + ByteBuffer.wrap(new byte[] { (byte)0xa }); + + /** + * Flushes a bufferedSpans array. + */ + private void doFlush(byte[][] toFlush, int len) throws IOException { + int bidx = 0, widx = 0; + ByteBuffer writevBufs[] = new ByteBuffer[2 * WRITEV_SIZE]; + + while (true) { + if (widx == writevBufs.length) { + channel.write(writevBufs); + widx = 0; + } + if (bidx == len) { + break; + } + writevBufs[widx] = ByteBuffer.wrap(toFlush[bidx]); + writevBufs[widx + 1] = newlineBuf; + bidx++; + widx+=2; + } + if (widx > 0) { + channel.write(writevBufs, 0, widx); + } + } + + @Override + public void receiveSpan(Span span) { + // Serialize the span data into a byte[]. Note that we're not holding the + // lock here, to improve concurrency. + byte jsonBuf[] = null; + try { + jsonBuf = JSON_WRITER.writeValueAsBytes(span); + } catch (JsonProcessingException e) { + LOG.error("receiveSpan(path=" + path + ", span=" + span + "): " + + "Json processing error: " + e.getMessage()); + return; + } + + // Grab the bufferLock and put our jsonBuf into the list of buffers to + // flush. + byte toFlush[][] = null; + bufferLock.lock(); + try { + if (bufferedSpans == null) { + LOG.debug("receiveSpan(path=" + path + ", span=" + span + "): " + + "LocalFileSpanReceiver for " + path + " is closed."); + return; + } + bufferedSpans[bufferedSpansIndex] = jsonBuf; + bufferedSpansIndex++; + if (bufferedSpansIndex == bufferedSpans.length) { + // If we've hit the limit for the number of buffers to flush, + // swap out the existing bufferedSpans array for a new array, and + // prepare to flush those spans to disk. + toFlush = bufferedSpans; + bufferedSpansIndex = 0; + bufferedSpans = new byte[bufferedSpans.length][]; + } + } finally { + bufferLock.unlock(); + } + if (toFlush != null) { + // We released the bufferLock above, to avoid blocking concurrent + // receiveSpan calls. But now, we must take the channelLock, to make + // sure that we have sole access to the output channel. If we did not do + // this, we might get interleaved output. + // + // There is a small chance that another thread doing a flush of more + // recent spans could get ahead of us here, and take the lock before we + // do. This is ok, since spans don't have to be written out in order. + channelLock.lock(); + try { + doFlush(toFlush, toFlush.length); + } catch (IOException ioe) { + LOG.error("Error flushing buffers to " + path + ": " + + ioe.getMessage()); + } finally { + channelLock.unlock(); + } + } + } + + @Override + public void close() throws IOException { + byte toFlush[][] = null; + int numToFlush = 0; + bufferLock.lock(); + try { + if (bufferedSpans == null) { + LOG.info("LocalFileSpanReceiver for " + path + " was already closed."); + return; + } + numToFlush = bufferedSpansIndex; + bufferedSpansIndex = 0; + toFlush = bufferedSpans; + bufferedSpans = null; + } finally { + bufferLock.unlock(); + } + channelLock.lock(); + try { + doFlush(toFlush, numToFlush); + } catch (IOException ioe) { + LOG.error("Error flushing buffers to " + path + ": " + + ioe.getMessage()); + } finally { + try { + stream.close(); + } catch (IOException e) { + LOG.error("Error closing stream for " + path, e); + } + channelLock.unlock(); + } + } + + public static String getUniqueLocalTraceFileName() { + String tmp = System.getProperty("java.io.tmpdir", "/tmp"); + String nonce = null; + BufferedReader reader = null; + try { + // On Linux we can get a unique local file name by reading the process id + // out of /proc/self/stat. (There isn't any portable way to get the + // process ID from Java.) + reader = new BufferedReader( + new InputStreamReader(new FileInputStream("/proc/self/stat"), + "UTF-8")); + String line = reader.readLine(); + if (line == null) { + throw new EOFException(); + } + nonce = line.split(" ")[0]; + } catch (IOException e) { + } finally { + if (reader != null) { + try { + reader.close(); + } catch(IOException e) { + LOG.warn("Exception in closing " + reader, e); + } + } + } + if (nonce == null) { + // If we can't use the process ID, use a random nonce. + nonce = UUID.randomUUID().toString(); + } + return new File(tmp, nonce).getAbsolutePath(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core4/src/main/java/org/apache/htrace/core/MilliSpan.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/MilliSpan.java b/htrace-core4/src/main/java/org/apache/htrace/core/MilliSpan.java new file mode 100644 index 0000000..5dd6bdb --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/MilliSpan.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; + +import java.io.IOException; +import java.io.StringWriter; +import java.io.UnsupportedEncodingException; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * A Span implementation that stores its information in milliseconds since the + * epoch. + */ +@JsonDeserialize(using = MilliSpan.MilliSpanDeserializer.class) +public class MilliSpan implements Span { + private static ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static ObjectReader JSON_READER = OBJECT_MAPPER.reader(MilliSpan.class); + private static ObjectWriter JSON_WRITER = OBJECT_MAPPER.writer(); + private static final SpanId EMPTY_PARENT_ARRAY[] = new SpanId[0]; + private static final String EMPTY_STRING = ""; + + private long begin; + private long end; + private final String description; + private SpanId parents[]; + private final SpanId spanId; + private Map<String, String> traceInfo = null; + private String tracerId; + private List<TimelineAnnotation> timeline = null; + + @Override + public Span child(String childDescription) { + return new MilliSpan.Builder(). + begin(System.currentTimeMillis()). + end(0). + description(childDescription). + parents(new SpanId[] {spanId}). + spanId(spanId.newChildId()). + tracerId(tracerId). + build(); + } + + /** + * The public interface for constructing a MilliSpan. + */ + public static class Builder { + private long begin; + private long end; + private String description = EMPTY_STRING; + private SpanId parents[] = EMPTY_PARENT_ARRAY; + private SpanId spanId = SpanId.INVALID; + private Map<String, String> traceInfo = null; + private String tracerId = EMPTY_STRING; + private List<TimelineAnnotation> timeline = null; + + public Builder() { + } + + public Builder begin(long begin) { + this.begin = begin; + return this; + } + + public Builder end(long end) { + this.end = end; + return this; + } + + public Builder description(String description) { + this.description = description; + return this; + } + + public Builder parents(SpanId parents[]) { + this.parents = parents; + return this; + } + + public Builder parents(List<SpanId> parentList) { + SpanId[] parents = new SpanId[parentList.size()]; + for (int i = 0; i < parentList.size(); i++) { + parents[i] = parentList.get(i); + } + this.parents = parents; + return this; + } + + public Builder spanId(SpanId spanId) { + this.spanId = spanId; + return this; + } + + public Builder traceInfo(Map<String, String> traceInfo) { + this.traceInfo = traceInfo.isEmpty() ? null : traceInfo; + return this; + } + + public Builder tracerId(String tracerId) { + this.tracerId = tracerId; + return this; + } + + public Builder timeline(List<TimelineAnnotation> timeline) { + this.timeline = timeline.isEmpty() ? null : timeline; + return this; + } + + public MilliSpan build() { + return new MilliSpan(this); + } + } + + public MilliSpan() { + this.begin = 0; + this.end = 0; + this.description = EMPTY_STRING; + this.parents = EMPTY_PARENT_ARRAY; + this.spanId = SpanId.INVALID; + this.traceInfo = null; + this.tracerId = EMPTY_STRING; + this.timeline = null; + } + + private MilliSpan(Builder builder) { + this.begin = builder.begin; + this.end = builder.end; + this.description = builder.description; + this.parents = builder.parents; + this.spanId = builder.spanId; + this.traceInfo = builder.traceInfo; + this.tracerId = builder.tracerId; + this.timeline = builder.timeline; + } + + @Override + public synchronized void stop() { + if (end == 0) { + if (begin == 0) + throw new IllegalStateException("Span for " + description + + " has not been started"); + end = System.currentTimeMillis(); + } + } + + protected long currentTimeMillis() { + return System.currentTimeMillis(); + } + + @Override + public synchronized boolean isRunning() { + return begin != 0 && end == 0; + } + + @Override + public synchronized long getAccumulatedMillis() { + if (begin == 0) + return 0; + if (end > 0) + return end - begin; + return currentTimeMillis() - begin; + } + + @Override + public String toString() { + return toJson(); + } + + @Override + public String getDescription() { + return description; + } + + @Override + public SpanId getSpanId() { + return spanId; + } + + @Override + public SpanId[] getParents() { + return parents; + } + + @Override + public void setParents(SpanId[] parents) { + this.parents = parents; + } + + @Override + public long getStartTimeMillis() { + return begin; + } + + @Override + public long getStopTimeMillis() { + return end; + } + + @Override + public void addKVAnnotation(String key, String value) { + if (traceInfo == null) + traceInfo = new HashMap<String, String>(); + traceInfo.put(key, value); + } + + @Override + public void addTimelineAnnotation(String msg) { + if (timeline == null) { + timeline = new ArrayList<TimelineAnnotation>(); + } + timeline.add(new TimelineAnnotation(System.currentTimeMillis(), msg)); + } + + @Override + public Map<String, String> getKVAnnotations() { + if (traceInfo == null) + return Collections.emptyMap(); + return Collections.unmodifiableMap(traceInfo); + } + + @Override + public List<TimelineAnnotation> getTimelineAnnotations() { + if (timeline == null) { + return Collections.emptyList(); + } + return Collections.unmodifiableList(timeline); + } + + @Override + public String getTracerId() { + return tracerId; + } + + @Override + public void setTracerId(String tracerId) { + this.tracerId = tracerId; + } + + @Override + public String toJson() { + StringWriter writer = new StringWriter(); + try { + JSON_WRITER.writeValue(writer, this); + } catch (IOException e) { + // An IOException should not be possible when writing to a string. + throw new RuntimeException(e); + } + return writer.toString(); + } + + public static class MilliSpanDeserializer + extends JsonDeserializer<MilliSpan> { + @Override + public MilliSpan deserialize(JsonParser jp, DeserializationContext ctxt) + throws IOException, JsonProcessingException { + JsonNode root = jp.getCodec().readTree(jp); + Builder builder = new Builder(); + JsonNode bNode = root.get("b"); + if (bNode != null) { + builder.begin(bNode.asLong()); + } + JsonNode eNode = root.get("e"); + if (eNode != null) { + builder.end(eNode.asLong()); + } + JsonNode dNode = root.get("d"); + if (dNode != null) { + builder.description(dNode.asText()); + } + JsonNode sNode = root.get("a"); + if (sNode != null) { + builder.spanId(SpanId.fromString(sNode.asText())); + } + JsonNode rNode = root.get("r"); + if (rNode != null) { + builder.tracerId(rNode.asText()); + } + JsonNode parentsNode = root.get("p"); + LinkedList<SpanId> parents = new LinkedList<SpanId>(); + if (parentsNode != null) { + for (Iterator<JsonNode> iter = parentsNode.elements(); + iter.hasNext(); ) { + JsonNode parentIdNode = iter.next(); + parents.add(SpanId.fromString(parentIdNode.asText())); + } + } + builder.parents(parents); + JsonNode traceInfoNode = root.get("n"); + if (traceInfoNode != null) { + HashMap<String, String> traceInfo = new HashMap<String, String>(); + for (Iterator<String> iter = traceInfoNode.fieldNames(); + iter.hasNext(); ) { + String field = iter.next(); + traceInfo.put(field, traceInfoNode.get(field).asText()); + } + builder.traceInfo(traceInfo); + } + JsonNode timelineNode = root.get("t"); + if (timelineNode != null) { + LinkedList<TimelineAnnotation> timeline = + new LinkedList<TimelineAnnotation>(); + for (Iterator<JsonNode> iter = timelineNode.elements(); + iter.hasNext(); ) { + JsonNode ann = iter.next(); + timeline.add(new TimelineAnnotation(ann.get("t").asLong(), + ann.get("m").asText())); + } + builder.timeline(timeline); + } + return builder.build(); + } + } + + public static MilliSpan fromJson(String json) throws IOException { + return JSON_READER.readValue(json); + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core4/src/main/java/org/apache/htrace/core/NeverSampler.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/NeverSampler.java b/htrace-core4/src/main/java/org/apache/htrace/core/NeverSampler.java new file mode 100644 index 0000000..60cc7d2 --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/NeverSampler.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +/** + * A Sampler that never returns true. + */ +public final class NeverSampler extends Sampler { + public static final NeverSampler INSTANCE = new NeverSampler(null); + + public NeverSampler(HTraceConfiguration conf) { + } + + @Override + public boolean next() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core4/src/main/java/org/apache/htrace/core/NullScope.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/NullScope.java b/htrace-core4/src/main/java/org/apache/htrace/core/NullScope.java new file mode 100644 index 0000000..fe76e46 --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/NullScope.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +/** + * An empty {@link TraceScope}. + */ +class NullScope extends TraceScope { + NullScope(Tracer tracer) { + super(tracer, null, null); + } + + @Override + public SpanId getSpanId() { + return SpanId.INVALID; + } + + @Override + public void detach() { + if (detached) { + Tracer.throwClientError("Can't detach this TraceScope because " + + "it is already detached."); + } + detached = true; + } + + @Override + public void reattach() { + if (!detached) { + Tracer.throwClientError("Can't reattach this TraceScope because " + + "it is not detached."); + } + detached = false; + } + + @Override + public void close() { + tracer.popNullScope(); + } + + @Override + public String toString() { + return "NullScope"; + } + + @Override + public void addKVAnnotation(String key, String value) { + // do nothing + } + + @Override + public void addTimelineAnnotation(String msg) { + // do nothing + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core4/src/main/java/org/apache/htrace/core/POJOSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/POJOSpanReceiver.java b/htrace-core4/src/main/java/org/apache/htrace/core/POJOSpanReceiver.java new file mode 100644 index 0000000..34322fa --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/POJOSpanReceiver.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; + +/** + * SpanReceiver for testing only that just collects the Span objects it + * receives. The spans it receives can be accessed with getSpans(); + */ +public class POJOSpanReceiver extends SpanReceiver { + private final Collection<Span> spans; + + public POJOSpanReceiver(HTraceConfiguration conf) { + this.spans = new HashSet<Span>(); + } + + /** + * @return The spans this POJOSpanReceiver has received. + */ + public Collection<Span> getSpans() { + return spans; + } + + @Override + public void close() throws IOException { + } + + @Override + public void receiveSpan(Span span) { + spans.add(span); + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core4/src/main/java/org/apache/htrace/core/ProbabilitySampler.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/ProbabilitySampler.java b/htrace-core4/src/main/java/org/apache/htrace/core/ProbabilitySampler.java new file mode 100644 index 0000000..c0bb16c --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/ProbabilitySampler.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Sampler that returns true a certain percentage of the time. Specify the frequency interval by + * configuring a {@code double} value for {@link #SAMPLER_FRACTION_CONF_KEY}. + */ +public class ProbabilitySampler extends Sampler { + private static final Log LOG = LogFactory.getLog(ProbabilitySampler.class); + public final double threshold; + public final static String SAMPLER_FRACTION_CONF_KEY = "sampler.fraction"; + + public ProbabilitySampler(HTraceConfiguration conf) { + this.threshold = Double.parseDouble(conf.get(SAMPLER_FRACTION_CONF_KEY)); + if (LOG.isTraceEnabled()) { + LOG.trace("Created new ProbabilitySampler with threshold = " + + threshold + "."); + } + } + + @Override + public boolean next() { + return ThreadLocalRandom.current().nextDouble() < threshold; + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core4/src/main/java/org/apache/htrace/core/Sampler.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/Sampler.java b/htrace-core4/src/main/java/org/apache/htrace/core/Sampler.java new file mode 100644 index 0000000..af0165c --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/Sampler.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import java.lang.reflect.Constructor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Extremely simple callback to determine the frequency that an action should be + * performed. + * <p/> + * For example, the next() function may look like this: + * <p/> + * <pre> + * <code> + * public boolean next() { + * return Math.random() > 0.5; + * } + * </code> + * </pre> + * This would trace 50% of all gets, 75% of all puts and would not trace any other requests. + */ +public abstract class Sampler { + /** + * A {@link Sampler} builder. It takes a {@link Sampler} class name and + * constructs an instance of that class, with the provided configuration. + */ + public static class Builder { + private static final Log LOG = LogFactory.getLog(Builder.class); + + private final static String DEFAULT_PACKAGE = "org.apache.htrace.core"; + private final HTraceConfiguration conf; + private String className; + private ClassLoader classLoader = Builder.class.getClassLoader(); + + public Builder(HTraceConfiguration conf) { + this.conf = conf; + reset(); + } + + public Builder reset() { + this.className = null; + return this; + } + + public Builder className(String className) { + this.className = className; + return this; + } + + public Builder classLoader(ClassLoader classLoader) { + this.classLoader = classLoader; + return this; + } + + private void throwError(String errorStr) { + LOG.error(errorStr); + throw new RuntimeException(errorStr); + } + + private void throwError(String errorStr, Throwable e) { + LOG.error(errorStr, e); + throw new RuntimeException(errorStr, e); + } + + public Sampler build() { + Sampler sampler = newSampler(); + if (LOG.isTraceEnabled()) { + LOG.trace("Created new sampler of type " + + sampler.getClass().getName(), new Exception()); + } + return sampler; + } + + private Sampler newSampler() { + if (className == null || className.isEmpty()) { + throwError("No sampler class specified."); + } + String str = className; + if (!str.contains(".")) { + str = DEFAULT_PACKAGE + "." + str; + } + Class cls = null; + try { + cls = classLoader.loadClass(str); + } catch (ClassNotFoundException e) { + throwError("Cannot find Sampler class " + str); + } + Constructor<Sampler> ctor = null; + try { + ctor = cls.getConstructor(HTraceConfiguration.class); + } catch (NoSuchMethodException e) { + throwError("Cannot find a constructor for class " + + str + "which takes an HTraceConfiguration."); + } + Sampler sampler = null; + try { + LOG.debug("Creating new instance of " + str + "..."); + sampler = ctor.newInstance(conf); + } catch (ReflectiveOperationException e) { + throwError("Reflection error when constructing " + + str + ".", e); + } catch (Throwable t) { + throwError("NewInstance error when constructing " + + str + ".", t); + } + return sampler; + } + } + + public static final Sampler ALWAYS = AlwaysSampler.INSTANCE; + public static final Sampler NEVER = NeverSampler.INSTANCE; + + public abstract boolean next(); +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core4/src/main/java/org/apache/htrace/core/Span.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/Span.java b/htrace-core4/src/main/java/org/apache/htrace/core/Span.java new file mode 100644 index 0000000..e63d414 --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/Span.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +/** + * Base interface for gathering and reporting statistics about a block of + * execution. + * <p/> + * Spans should form a directed acyclic graph structure. It should be possible + * to keep following the parents of a span until you arrive at a span with no + * parents.<p/> + */ +@JsonSerialize(using = Span.SpanSerializer.class) +public interface Span { + /** + * The block has completed, stop the clock + */ + void stop(); + + /** + * Get the start time, in milliseconds + */ + long getStartTimeMillis(); + + /** + * Get the stop time, in milliseconds + */ + long getStopTimeMillis(); + + /** + * Return the total amount of time elapsed since start was called, if running, + * or difference between stop and start + */ + long getAccumulatedMillis(); + + /** + * Has the span been started and not yet stopped? + */ + boolean isRunning(); + + /** + * Return a textual description of this span.<p/> + * + * Will never be null. + */ + String getDescription(); + + /** + * A pseudo-unique (random) number assigned to this span instance.<p/> + * + * The spanId is immutable and cannot be changed. It is safe to access this + * from multiple threads. + */ + SpanId getSpanId(); + + /** + * Create a child span of this span with the given description + * @deprecated Since 4.0.0. Use {@link MilliSpan.Builder} + */ + @Deprecated + Span child(String description); + + @Override + String toString(); + + /** + * Returns the parent IDs of the span.<p/> + * + * The array will be empty if there are no parents. + */ + SpanId[] getParents(); + + /** + * Set the parents of this span.<p/> + * + * Any existing parents will be cleared by this call. + */ + void setParents(SpanId[] parents); + + /** + * Add a data annotation associated with this span + */ + void addKVAnnotation(String key, String value); + + /** + * Add a timeline annotation associated with this span + */ + void addTimelineAnnotation(String msg); + + /** + * Get data associated with this span (read only)<p/> + * + * Will never be null. + */ + Map<String, String> getKVAnnotations(); + + /** + * Get any timeline annotations (read only)<p/> + * + * Will never be null. + */ + List<TimelineAnnotation> getTimelineAnnotations(); + + /** + * Return a unique id for the process from which this Span originated.<p/> + * + * Will never be null. + */ + String getTracerId(); + + /** + * Set the process id of a span. + */ + void setTracerId(String s); + + /** + * Serialize to Json + */ + String toJson(); + + public static class SpanSerializer extends JsonSerializer<Span> { + @Override + public void serialize(Span span, JsonGenerator jgen, SerializerProvider provider) + throws IOException { + jgen.writeStartObject(); + if (span.getSpanId().isValid()) { + jgen.writeStringField("a", span.getSpanId().toString()); + } + if (span.getStartTimeMillis() != 0) { + jgen.writeNumberField("b", span.getStartTimeMillis()); + } + if (span.getStopTimeMillis() != 0) { + jgen.writeNumberField("e", span.getStopTimeMillis()); + } + if (!span.getDescription().isEmpty()) { + jgen.writeStringField("d", span.getDescription()); + } + String tracerId = span.getTracerId(); + if (!tracerId.isEmpty()) { + jgen.writeStringField("r", tracerId); + } + jgen.writeArrayFieldStart("p"); + for (SpanId parent : span.getParents()) { + jgen.writeString(parent.toString()); + } + jgen.writeEndArray(); + Map<String, String> traceInfoMap = span.getKVAnnotations(); + if (!traceInfoMap.isEmpty()) { + jgen.writeObjectFieldStart("n"); + for (Map.Entry<String, String> e : traceInfoMap.entrySet()) { + jgen.writeStringField(e.getKey(), e.getValue()); + } + jgen.writeEndObject(); + } + List<TimelineAnnotation> timelineAnnotations = + span.getTimelineAnnotations(); + if (!timelineAnnotations.isEmpty()) { + jgen.writeArrayFieldStart("t"); + for (TimelineAnnotation tl : timelineAnnotations) { + jgen.writeStartObject(); + jgen.writeNumberField("t", tl.getTime()); + jgen.writeStringField("m", tl.getMessage()); + jgen.writeEndObject(); + } + jgen.writeEndArray(); + } + jgen.writeEndObject(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core4/src/main/java/org/apache/htrace/core/SpanId.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/SpanId.java b/htrace-core4/src/main/java/org/apache/htrace/core/SpanId.java new file mode 100644 index 0000000..ed31ad3 --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/SpanId.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * Uniquely identifies an HTrace span. + * + * Span IDs are 128 bits in total. The upper 64 bits of a span ID is the same + * as the upper 64 bits of the parent span, if there is one. The lower 64 bits + * are always random. + */ +public final class SpanId implements Comparable<SpanId> { + private static final int SPAN_ID_STRING_LENGTH = 32; + private final long high; + private final long low; + + /** + * The invalid span ID, which is all zeroes. + * + * It is also the "least" span ID in the sense that it is considered + * smaller than any other span ID. + */ + public static SpanId INVALID = new SpanId(0, 0); + + private static long nonZeroRand64() { + while (true) { + long r = ThreadLocalRandom.current().nextLong(); + if (r != 0) { + return r; + } + } + } + + public static SpanId fromRandom() { + return new SpanId(nonZeroRand64(), nonZeroRand64()); + } + + public static SpanId fromString(String str) { + if (str.length() != SPAN_ID_STRING_LENGTH) { + throw new RuntimeException("Invalid SpanID string: length was not " + + SPAN_ID_STRING_LENGTH); + } + long high = + ((Long.parseLong(str.substring(0, 8), 16)) << 32) | + (Long.parseLong(str.substring(8, 16), 16)); + long low = + ((Long.parseLong(str.substring(16, 24), 16)) << 32) | + (Long.parseLong(str.substring(24, 32), 16)); + return new SpanId(high, low); + } + + public SpanId(long high, long low) { + this.high = high; + this.low = low; + } + + public long getHigh() { + return high; + } + + public long getLow() { + return low; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof SpanId)) { + return false; + } + SpanId other = (SpanId)o; + return ((other.high == high) && (other.low == low)); + } + + @Override + public int compareTo(SpanId other) { + int cmp = compareAsUnsigned(high, other.high); + if (cmp != 0) { + return cmp; + } + return compareAsUnsigned(low, other.low); + } + + private static int compareAsUnsigned(long a, long b) { + boolean aSign = a < 0; + boolean bSign = b < 0; + if (aSign != bSign) { + if (aSign) { + return 1; + } else { + return -1; + } + } + if (aSign) { + a = -a; + b = -b; + } + if (a < b) { + return -1; + } else if (a > b) { + return 1; + } else { + return 0; + } + } + + @Override + public int hashCode() { + return (int)((0xffffffff & (high >> 32))) ^ + (int)((0xffffffff & (high >> 0))) ^ + (int)((0xffffffff & (low >> 32))) ^ + (int)((0xffffffff & (low >> 0))); + } + + @Override + public String toString() { + return String.format("%08x%08x%08x%08x", + (0x00000000ffffffffL & (high >> 32)), + (0x00000000ffffffffL & high), + (0x00000000ffffffffL & (low >> 32)), + (0x00000000ffffffffL & low)); + } + + public boolean isValid() { + return (high != 0) || (low != 0); + } + + public SpanId newChildId() { + return new SpanId(high, nonZeroRand64()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core4/src/main/java/org/apache/htrace/core/SpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/SpanReceiver.java b/htrace-core4/src/main/java/org/apache/htrace/core/SpanReceiver.java new file mode 100644 index 0000000..a955ddf --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/SpanReceiver.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import java.io.Closeable; +import java.lang.reflect.Constructor; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * The collector within a process that is the destination of Spans when a trace is running. + * {@code SpanReceiver} implementations are expected to provide a constructor with the signature + * <p> + * <pre> + * <code>public SpanReceiverImpl(HTraceConfiguration)</code> + * </pre> + */ +public abstract class SpanReceiver implements Closeable { + /** + * A {@link SpanReceiver} builder. It takes a {@link SpanReceiver} class name + * and constructs an instance of that class, with the provided configuration. + */ + public static class Builder { + private static final Log LOG = LogFactory.getLog(Builder.class); + + private final static String DEFAULT_PACKAGE = "org.apache.htrace.core"; + private final HTraceConfiguration conf; + private boolean logErrors; + private String className; + private ClassLoader classLoader = Builder.class.getClassLoader(); + + public Builder(HTraceConfiguration conf) { + this.conf = conf; + reset(); + } + + /** + * Set this builder back to defaults. + * + * @return this instance. + */ + public Builder reset() { + this.logErrors = true; + this.className = null; + return this; + } + + public Builder className(final String className) { + this.className = className; + return this; + } + + /** + * Configure whether we should log errors during build(). + * @return This instance + */ + public Builder logErrors(boolean logErrors) { + this.logErrors = logErrors; + return this; + } + + public Builder classLoader(ClassLoader classLoader) { + this.classLoader = classLoader; + return this; + } + + private void throwError(String errorStr) { + if (logErrors) { + LOG.error(errorStr); + } + throw new RuntimeException(errorStr); + } + + private void throwError(String errorStr, Throwable e) { + if (logErrors) { + LOG.error(errorStr, e); + } + throw new RuntimeException(errorStr, e); + } + + public SpanReceiver build() { + SpanReceiver spanReceiver = newSpanReceiver(); + if (LOG.isTraceEnabled()) { + LOG.trace("Created new span receiver of type " + + spanReceiver.getClass().getName()); + } + return spanReceiver; + } + + private SpanReceiver newSpanReceiver() { + if ((className == null) || className.isEmpty()) { + throwError("No span receiver class specified."); + } + String str = className; + if (!str.contains(".")) { + str = DEFAULT_PACKAGE + "." + str; + } + Class cls = null; + try { + cls = classLoader.loadClass(str); + } catch (ClassNotFoundException e) { + throwError("Cannot find SpanReceiver class " + str); + } + Constructor<SpanReceiver> ctor = null; + try { + ctor = cls.getConstructor(HTraceConfiguration.class); + } catch (NoSuchMethodException e) { + throwError("Cannot find a constructor for class " + + str + "which takes an HTraceConfiguration."); + } + SpanReceiver receiver = null; + try { + LOG.debug("Creating new instance of " + str + "..."); + receiver = ctor.newInstance(conf); + } catch (ReflectiveOperationException e) { + throwError("Reflection error when constructing " + + str + ".", e); + } catch (Throwable t) { + throwError("NewInstance error when constructing " + + str + ".", t); + } + return receiver; + } + } + + /** + * An ID which uniquely identifies this SpanReceiver. + */ + private final long id; + + private static final AtomicLong HIGHEST_SPAN_RECEIVER_ID = new AtomicLong(0); + + /** + * Get an ID uniquely identifying this SpanReceiver. + */ + public final long getId() { + return id; + } + + protected SpanReceiver() { + this.id = HIGHEST_SPAN_RECEIVER_ID.incrementAndGet(); + } + + /** + * Called when a Span is stopped and can now be stored. + */ + public abstract void receiveSpan(Span span); +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core4/src/main/java/org/apache/htrace/core/StandardOutSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/StandardOutSpanReceiver.java b/htrace-core4/src/main/java/org/apache/htrace/core/StandardOutSpanReceiver.java new file mode 100644 index 0000000..f443ec6 --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/StandardOutSpanReceiver.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; + +/** + * Used for testing. Simply prints to standard out any spans it receives. + */ +public class StandardOutSpanReceiver extends SpanReceiver { + private static final Log LOG = LogFactory.getLog(StandardOutSpanReceiver.class); + + public StandardOutSpanReceiver(HTraceConfiguration conf) { + LOG.trace("Created new StandardOutSpanReceiver."); + } + + @Override + public void receiveSpan(Span span) { + System.out.println(span); + } + + @Override + public void close() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core4/src/main/java/org/apache/htrace/core/TimelineAnnotation.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/TimelineAnnotation.java b/htrace-core4/src/main/java/org/apache/htrace/core/TimelineAnnotation.java new file mode 100644 index 0000000..18de061 --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/TimelineAnnotation.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +public class TimelineAnnotation { + private final long time; + private final String msg; + + public TimelineAnnotation(long time, String msg) { + this.time = time; + this.msg = msg; + } + + public long getTime() { + return time; + } + + public String getMessage() { + return msg; + } + + @Override + public String toString() { + return "@" + time + ": " + msg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core4/src/main/java/org/apache/htrace/core/TraceCallable.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/TraceCallable.java b/htrace-core4/src/main/java/org/apache/htrace/core/TraceCallable.java new file mode 100644 index 0000000..a0fec17 --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/TraceCallable.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import java.util.concurrent.Callable; + +/** + * Wrap a Callable with a Span that survives a change in threads. + */ +public class TraceCallable<V> implements Callable<V> { + private final Tracer tracer; + private final Callable<V> impl; + private final TraceScope parent; + private final String description; + + TraceCallable(Tracer tracer, TraceScope parent, Callable<V> impl, + String description) { + this.tracer = tracer; + this.impl = impl; + this.parent = parent; + if (description == null) { + this.description = Thread.currentThread().getName(); + } else { + this.description = description; + } + } + + @Override + public V call() throws Exception { + TraceScope chunk = tracer.newScope(description, + parent.getSpan().getSpanId()); + try { + return impl.call(); + } finally { + chunk.close(); + } + } + + public Callable<V> getImpl() { + return impl; + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java b/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java new file mode 100644 index 0000000..81e31ea --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class TraceExecutorService implements ExecutorService { + private final Tracer tracer; + private final String scopeName; + private final ExecutorService impl; + + TraceExecutorService(Tracer tracer, String scopeName, + ExecutorService impl) { + this.tracer = tracer; + this.scopeName = scopeName; + this.impl = impl; + } + + @Override + public void execute(Runnable command) { + impl.execute(tracer.wrap(command, scopeName)); + } + + @Override + public void shutdown() { + impl.shutdown(); + } + + @Override + public List<Runnable> shutdownNow() { + return impl.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return impl.isShutdown(); + } + + @Override + public boolean isTerminated() { + return impl.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { + return impl.awaitTermination(timeout, unit); + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + return impl.submit(tracer.wrap(task, scopeName)); + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + return impl.submit(tracer.wrap(task, scopeName), result); + } + + @Override + public Future<?> submit(Runnable task) { + return impl.submit(tracer.wrap(task, scopeName)); + } + + private <T> Collection<? extends Callable<T>> wrapCollection( + Collection<? extends Callable<T>> tasks) { + List<Callable<T>> result = new ArrayList<Callable<T>>(); + for (Callable<T> task : tasks) { + result.add(tracer.wrap(task, scopeName)); + } + return result; + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) + throws InterruptedException { + return impl.invokeAll(wrapCollection(tasks)); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, + long timeout, TimeUnit unit) throws InterruptedException { + return impl.invokeAll(wrapCollection(tasks), timeout, unit); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + return impl.invokeAny(wrapCollection(tasks)); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, + TimeUnit unit) throws InterruptedException, ExecutionException, + TimeoutException { + return impl.invokeAny(wrapCollection(tasks), timeout, unit); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/26f2c79f/htrace-core4/src/main/java/org/apache/htrace/core/TraceRunnable.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/TraceRunnable.java b/htrace-core4/src/main/java/org/apache/htrace/core/TraceRunnable.java new file mode 100644 index 0000000..8f98708 --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/TraceRunnable.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +/** + * Wrap a Runnable with a Span that survives a change in threads. + */ +public class TraceRunnable implements Runnable { + private final Tracer tracer; + private final TraceScope parent; + private final Runnable runnable; + private final String description; + + public TraceRunnable(Tracer tracer, TraceScope parent, + Runnable runnable, String description) { + this.tracer = tracer; + this.parent = parent; + this.runnable = runnable; + if (description == null) { + this.description = Thread.currentThread().getName(); + } else { + this.description = description; + } + } + + @Override + public void run() { + TraceScope chunk = tracer.newScope(description, + parent.getSpan().getSpanId()); + try { + runnable.run(); + } finally { + chunk.close(); + } + } + + public Runnable getRunnable() { + return runnable; + } +}
