ACCUMULO-898 convert accumulo to use htrace
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/db2dda1b Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/db2dda1b Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/db2dda1b Branch: refs/heads/master Commit: db2dda1b7b6431a2ee57148db2de74c0432b9480 Parents: 34ca056 Author: Billie Rinaldi <[email protected]> Authored: Tue Nov 4 10:16:11 2014 -0800 Committer: Billie Rinaldi <[email protected]> Committed: Fri Nov 7 14:32:04 2014 -0800 ---------------------------------------------------------------------- assemble/bin/stop-all.sh | 2 +- assemble/pom.xml | 4 + assemble/src/main/assemblies/component.xml | 1 + core/pom.xml | 4 + .../core/client/ClientConfiguration.java | 34 +- .../org/apache/accumulo/core/conf/Property.java | 4 + .../apache/accumulo/core/conf/PropertyType.java | 3 + .../accumulo/core/trace/AsyncSpanReceiver.java | 173 ++++ .../accumulo/core/trace/DistributedTrace.java | 208 ++++- .../accumulo/core/trace/SendSpansViaThrift.java | 97 +++ .../apache/accumulo/core/trace/TraceDump.java | 3 +- .../accumulo/core/trace/TraceFormatter.java | 17 +- .../accumulo/core/trace/ZooTraceClient.java | 78 +- .../apache/accumulo/core/util/ThriftUtil.java | 7 +- .../main/asciidoc/chapters/administration.txt | 106 ++- docs/src/main/resources/distributedTracing.html | 13 +- .../examples/simple/client/TracingExample.java | 29 +- .../minicluster/MiniAccumuloInstance.java | 2 +- pom.xml | 6 + .../org/apache/accumulo/server/Accumulo.java | 9 - .../apache/accumulo/server/init/Initialize.java | 1 - .../server/trace/TraceFSDataInputStream.java | 90 -- .../accumulo/server/trace/TraceFileSystem.java | 818 ------------------- .../accumulo/server/util/AccumuloStatus.java | 3 +- .../org/apache/accumulo/server/util/ZooZap.java | 7 +- .../accumulo/gc/SimpleGarbageCollector.java | 17 +- .../java/org/apache/accumulo/master/Master.java | 5 +- .../master/replication/ReplicationDriver.java | 9 +- .../org/apache/accumulo/monitor/Monitor.java | 9 +- .../monitor/servlets/trace/ShowTrace.java | 26 +- .../accumulo/monitor/ShowTraceLinkTypeTest.java | 60 +- .../org/apache/accumulo/tracer/TraceServer.java | 21 +- .../tserver/BulkFailedCopyProcessor.java | 7 +- .../apache/accumulo/tserver/InMemoryMap.java | 5 +- .../apache/accumulo/tserver/TabletServer.java | 5 +- .../replication/AccumuloReplicaSystem.java | 2 +- .../apache/accumulo/tserver/tablet/Tablet.java | 7 +- .../java/org/apache/accumulo/shell/Shell.java | 5 +- .../accumulo/shell/commands/TraceCommand.java | 2 +- .../org/apache/accumulo/test/TestIngest.java | 12 +- .../org/apache/accumulo/test/VerifyIngest.java | 9 +- .../accumulo/test/ConditionalWriterIT.java | 3 +- .../accumulo/test/functional/BulkFileIT.java | 3 +- .../accumulo/test/functional/ExamplesIT.java | 2 +- .../accumulo/test/functional/SimpleMacIT.java | 18 +- trace/pom.xml | 4 + .../trace/instrument/CloudtraceSpan.java | 71 ++ .../accumulo/trace/instrument/CountSampler.java | 18 +- .../accumulo/trace/instrument/Sampler.java | 6 +- .../apache/accumulo/trace/instrument/Span.java | 213 +++-- .../apache/accumulo/trace/instrument/Trace.java | 133 ++- .../trace/instrument/TraceCallable.java | 24 +- .../trace/instrument/TraceExecutorService.java | 12 + .../accumulo/trace/instrument/TraceProxy.java | 72 -- .../trace/instrument/TraceRunnable.java | 20 +- .../accumulo/trace/instrument/Tracer.java | 127 +-- .../trace/instrument/impl/MilliSpan.java | 141 ---- .../trace/instrument/impl/NullSpan.java | 102 --- .../trace/instrument/impl/RootMilliSpan.java | 43 - .../instrument/receivers/AsyncSpanReceiver.java | 132 --- .../trace/instrument/receivers/LogSpans.java | 63 -- .../receivers/SendSpansViaThrift.java | 89 -- .../instrument/receivers/SpanReceiver.java | 28 - .../instrument/receivers/ZooSpanClient.java | 122 --- .../accumulo/trace/thrift/Annotation.java | 502 ++++++++++++ .../accumulo/trace/thrift/RemoteSpan.java | 224 ++++- trace/src/main/thrift/trace.thrift | 8 +- .../accumulo/trace/instrument/TracerTest.java | 63 +- 68 files changed, 1983 insertions(+), 2180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/assemble/bin/stop-all.sh ---------------------------------------------------------------------- diff --git a/assemble/bin/stop-all.sh b/assemble/bin/stop-all.sh index 4bf06c0..0af0ee1 100755 --- a/assemble/bin/stop-all.sh +++ b/assemble/bin/stop-all.sh @@ -64,5 +64,5 @@ done "${bin}/tdown.sh" echo "Cleaning all server entries in ZooKeeper" -"$ACCUMULO_HOME/bin/accumulo" org.apache.accumulo.server.util.ZooZap -master -tservers -tracers +"$ACCUMULO_HOME/bin/accumulo" org.apache.accumulo.server.util.ZooZap -master -tservers -tracers --site-file "$ACCUMULO_CONF_DIR/accumulo-site.xml" http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/assemble/pom.xml ---------------------------------------------------------------------- diff --git a/assemble/pom.xml b/assemble/pom.xml index 89a3747..c764091 100644 --- a/assemble/pom.xml +++ b/assemble/pom.xml @@ -161,6 +161,10 @@ <artifactId>jetty-util</artifactId> </dependency> <dependency> + <groupId>org.htrace</groupId> + <artifactId>htrace-core</artifactId> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/assemble/src/main/assemblies/component.xml ---------------------------------------------------------------------- diff --git a/assemble/src/main/assemblies/component.xml b/assemble/src/main/assemblies/component.xml index 599d26c..3f18da3 100644 --- a/assemble/src/main/assemblies/component.xml +++ b/assemble/src/main/assemblies/component.xml @@ -42,6 +42,7 @@ <include>org.eclipse.jetty:jetty-server</include> <include>org.eclipse.jetty:jetty-servlet</include> <include>org.eclipse.jetty:jetty-util</include> + <include>org.htrace:htrace-core</include> <include>org.slf4j:slf4j-api</include> <include>org.slf4j:slf4j-log4j12</include> </includes> http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/core/pom.xml ---------------------------------------------------------------------- diff --git a/core/pom.xml b/core/pom.xml index 10e7d71..1cbc6df 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -103,6 +103,10 @@ <artifactId>zookeeper</artifactId> </dependency> <dependency> + <groupId>org.htrace</groupId> + <artifactId>htrace-core</artifactId> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java b/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java index 39b460d..6fe61a5 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java @@ -21,8 +21,11 @@ import java.io.File; import java.io.StringReader; import java.io.StringWriter; import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.accumulo.core.conf.Property; @@ -56,7 +59,10 @@ public class ClientConfiguration extends CompositeConfiguration { INSTANCE_ZK_HOST(Property.INSTANCE_ZK_HOST), INSTANCE_ZK_TIMEOUT(Property.INSTANCE_ZK_TIMEOUT), INSTANCE_NAME("instance.name", null, PropertyType.STRING, "Name of Accumulo instance to connect to"), - INSTANCE_ID("instance.id", null, PropertyType.STRING, "UUID of Accumulo instance to connect to"), ; + INSTANCE_ID("instance.id", null, PropertyType.STRING, "UUID of Accumulo instance to connect to"), + TRACE_SPAN_RECEIVERS(Property.TRACE_SPAN_RECEIVERS), + TRACE_SPAN_RECEIVER_PREFIX(Property.TRACE_SPAN_RECEIVER_PREFIX), + TRACE_ZK_PATH(Property.TRACE_ZK_PATH); private String key; private String defaultValue; @@ -208,6 +214,32 @@ public class ClientConfiguration extends CompositeConfiguration { return prop.getDefaultValue(); } + private void checkType(ClientProperty property, PropertyType type) { + if (!property.getType().equals(type)) { + String msg = "Configuration method intended for type " + type + " called with a " + property.getType() + " argument (" + property.getKey() + ")"; + throw new IllegalArgumentException(msg); + } + } + + /** + * Gets all properties under the given prefix in this configuration. + * + * @param property prefix property, must be of type PropertyType.PREFIX + * @return a map of property keys to values + * @throws IllegalArgumentException if property is not a prefix + */ + public Map<String,String> getAllPropertiesWithPrefix(ClientProperty property) { + checkType(property, PropertyType.PREFIX); + + Map<String,String> propMap = new HashMap<String,String>(); + Iterator<?> iter = this.getKeys(property.getKey()); + while (iter.hasNext()) { + String p = (String)iter.next(); + propMap.put(p, getString(p)); + } + return propMap; + } + /** * Sets the value of property to value * http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index a558760..fe313c1 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -27,6 +27,7 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; @@ -329,6 +330,9 @@ public enum Property { + "the date shown on the 'Recent Logs' monitor page"), TRACE_PREFIX("trace.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of distributed tracing."), + TRACE_SPAN_RECEIVERS("trace.span.receivers", "org.apache.accumulo.core.trace.ZooTraceClient", PropertyType.CLASSNAMELIST, "A list of span receiver classes to send trace spans"), + TRACE_SPAN_RECEIVER_PREFIX("trace.span.receiver.", null, PropertyType.PREFIX, "Prefix for span receiver configuration properties"), + TRACE_ZK_PATH("trace.span.receiver.zookeeper.path", Constants.ZTRACERS, PropertyType.STRING, "The zookeeper node where tracers are registered"), TRACE_PORT("trace.port.client", "12234", PropertyType.PORT, "The listening port for the trace server"), TRACE_TABLE("trace.table", "trace", PropertyType.STRING, "The name of the table to store distributed traces"), TRACE_USER("trace.user", "root", PropertyType.STRING, "The name of the user to store distributed traces"), http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java index fc20535..bf39da9 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/PropertyType.java @@ -68,6 +68,9 @@ public enum PropertyType { CLASSNAME("java class", "[\\w$.]*", "A fully qualified java class name representing a class on the classpath.\n" + "An example is 'java.lang.String', rather than 'String'"), + CLASSNAMELIST("java class list", "[\\w$.,]*", "A list of fully qualified java class names representing classes on the classpath.\n" + + "An example is 'java.lang.String', rather than 'String'"), + DURABILITY("durability", "(?:none|log|flush|sync)", "One of 'none', 'log', 'flush' or 'sync'."), STRING("string", ".*", http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/core/src/main/java/org/apache/accumulo/core/trace/AsyncSpanReceiver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/trace/AsyncSpanReceiver.java b/core/src/main/java/org/apache/accumulo/core/trace/AsyncSpanReceiver.java new file mode 100644 index 0000000..379302e --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/trace/AsyncSpanReceiver.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.trace; + +import org.apache.accumulo.trace.thrift.Annotation; +import org.apache.accumulo.trace.thrift.RemoteSpan; +import org.apache.log4j.Logger; +import org.htrace.HTraceConfiguration; +import org.htrace.Span; +import org.htrace.SpanReceiver; +import org.htrace.TimelineAnnotation; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.AbstractQueue; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * Deliver Span information periodically to a destination. + * <ul> + * <li>Send host and service information with the span. + * <li>Cache Destination objects by some key that can be extracted from the span. + * <li>Can be used to queue spans up for delivery over RPC, or for saving into a file. + * </ul> + */ +public abstract class AsyncSpanReceiver<SpanKey,Destination> implements SpanReceiver { + + private static final Logger log = Logger.getLogger(AsyncSpanReceiver.class); + + private final Map<SpanKey,Destination> clients = new HashMap<SpanKey,Destination>(); + + protected String host = null; + protected String service = null; + + protected abstract Destination createDestination(SpanKey key) throws Exception; + + protected abstract void send(Destination resource, RemoteSpan span) throws Exception; + + protected abstract SpanKey getSpanKey(Map<ByteBuffer,ByteBuffer> data); + + Timer timer = new Timer("SpanSender", true); + protected final AbstractQueue<RemoteSpan> sendQueue = new ConcurrentLinkedQueue<RemoteSpan>(); + + public AsyncSpanReceiver() { + this(1000); + } + + public AsyncSpanReceiver(long millis) { + timer.schedule(new TimerTask() { + @Override + public void run() { + try { + sendSpans(); + } catch (Exception ex) { + log.warn("Exception sending spans to destination", ex); + } + } + + }, millis, millis); + } + + protected void sendSpans() { + while (!sendQueue.isEmpty()) { + boolean sent = false; + RemoteSpan s = sendQueue.peek(); + SpanKey dest = getSpanKey(s.data); + Destination client = clients.get(dest); + if (client == null) { + try { + clients.put(dest, createDestination(dest)); + } catch (Exception ex) { + log.warn("Exception creating connection to span receiver", ex); + } + } + if (client != null) { + try { + send(client, s); + synchronized (sendQueue) { + sendQueue.remove(); + sendQueue.notifyAll(); + } + sent = true; + } catch (Exception ex) { + log.warn("Got error sending to " + dest + ", refreshing client", ex); + clients.remove(dest); + } + } + if (!sent) + break; + } + } + + public static Map<ByteBuffer, ByteBuffer> convertToByteBuffers(Map<byte[], byte[]> bytesMap) { + if (bytesMap == null) + return null; + Map<ByteBuffer, ByteBuffer> result = new HashMap<ByteBuffer, ByteBuffer>(); + for (Entry<byte[], byte[]> bytes : bytesMap.entrySet()) { + result.put(ByteBuffer.wrap(bytes.getKey()), ByteBuffer.wrap(bytes.getValue())); + } + return result; + } + + public static List<Annotation> convertToAnnotations(List<TimelineAnnotation> annotations) { + if (annotations == null) + return null; + List<Annotation> result = new ArrayList<Annotation>(); + for (TimelineAnnotation annotation : annotations) { + result.add(new Annotation(annotation.getTime(), annotation.getMessage())); + } + return result; + } + + @Override + public void receiveSpan(Span s) { + Map<ByteBuffer, ByteBuffer> data = convertToByteBuffers(s.getKVAnnotations()); + SpanKey dest = getSpanKey(data); + if (dest != null) { + List<Annotation> annotations = convertToAnnotations(s.getTimelineAnnotations()); + sendQueue.add(new RemoteSpan(host, service==null ? s.getProcessId() : service, s.getTraceId(), s.getSpanId(), s.getParentId(), + s.getStartTimeMillis(), s.getStopTimeMillis(), s.getDescription(), data, annotations)); + } + } + + @Override + public void close() { + synchronized (sendQueue) { + while (!sendQueue.isEmpty()) { + try { + sendQueue.wait(); + } catch (InterruptedException e) { + log.warn("flush interrupted"); + break; + } + } + } + } + + @Override + public void configure(HTraceConfiguration conf) { + host = conf.get(DistributedTrace.TRACE_HOST_PROPERTY, host); + if (host == null) { + try { + host = InetAddress.getLocalHost().getCanonicalHostName().toString(); + } catch (UnknownHostException e) { + host = "unknown"; + } + } + service = conf.get(DistributedTrace.TRACE_SERVICE_PROPERTY, service); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java b/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java index 83f5c26..fe9377e 100644 --- a/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java +++ b/core/src/main/java/org/apache/accumulo/core/trace/DistributedTrace.java @@ -17,27 +17,213 @@ package org.apache.accumulo.core.trace; import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; -import org.apache.accumulo.trace.instrument.Tracer; -import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.fate.zookeeper.ZooReader; +import org.apache.accumulo.trace.instrument.Trace; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.log4j.Logger; import org.apache.zookeeper.KeeperException; +import org.htrace.HTraceConfiguration; +import org.htrace.SpanReceiver; +/** + * Utility class to enable tracing for Accumulo server processes. + * + */ public class DistributedTrace { + private static final Logger log = Logger.getLogger(DistributedTrace.class); + + private static final String HTRACE_CONF_PREFIX = "hadoop."; + + public static final String TRACE_HOST_PROPERTY = "trace.host"; + public static final String TRACE_SERVICE_PROPERTY = "trace.service"; + + public static final String TRACER_ZK_HOST = "tracer.zookeeper.host"; + public static final String TRACER_ZK_TIMEOUT = "tracer.zookeeper.timeout"; + public static final String TRACER_ZK_PATH = "tracer.zookeeper.path"; + + private static final HashSet<SpanReceiver> receivers = new HashSet<SpanReceiver>(); + + /** + * @deprecated since 1.7, use {@link DistributedTrace#enable(String, String, org.apache.accumulo.core.client.ClientConfiguration)} instead + */ + @Deprecated public static void enable(Instance instance, ZooReader zoo, String application, String address) throws IOException, KeeperException, InterruptedException { - String path = ZooUtil.getRoot(instance) + Constants.ZTRACERS; - if (address == null) { + enable(address, application); + } + + /** + * Enable tracing by setting up SpanReceivers for the current process. + */ + public static void enable() { + enable(null, null); + } + + /** + * Enable tracing by setting up SpanReceivers for the current process. + * If service name is null, the simple name of the class will be used. + */ + public static void enable(String service) { + enable(null, service); + } + + /** + * Enable tracing by setting up SpanReceivers for the current process. + * If host name is null, it will be determined. + * If service name is null, the simple name of the class will be used. + */ + public static void enable(String hostname, String service) { + enable(hostname, service, ClientConfiguration.loadDefault()); + } + + /** + * Enable tracing by setting up SpanReceivers for the current process. + * If host name is null, it will be determined. + * If service name is null, the simple name of the class will be used. + * Properties required in the client configuration include {@link org.apache.accumulo.core.client.ClientConfiguration.ClientProperty#TRACE_SPAN_RECEIVERS} and any properties specific to the span receiver. + */ + public static void enable(String hostname, String service, ClientConfiguration conf) { + String spanReceivers = conf.get(ClientProperty.TRACE_SPAN_RECEIVERS); + String zookeepers = conf.get(ClientProperty.INSTANCE_ZK_HOST); + long timeout = AccumuloConfiguration.getTimeInMillis(conf.get(ClientProperty.INSTANCE_ZK_TIMEOUT)); + String zkPath = conf.get(ClientProperty.TRACE_ZK_PATH); + Map<String,String> properties = conf.getAllPropertiesWithPrefix(ClientProperty.TRACE_SPAN_RECEIVER_PREFIX); + enableTracing(hostname, service, spanReceivers, zookeepers, timeout, zkPath, properties); + } + + /** + * Enable tracing by setting up SpanReceivers for the current process. + * If host name is null, it will be determined. + * If service name is null, the simple name of the class will be used. + */ + public static void enable(String hostname, String service, AccumuloConfiguration conf) { + String spanReceivers = conf.get(Property.TRACE_SPAN_RECEIVERS); + String zookeepers = conf.get(Property.INSTANCE_ZK_HOST); + long timeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); + String zkPath = conf.get(Property.TRACE_ZK_PATH); + Map<String,String> properties = conf.getAllPropertiesWithPrefix(Property.TRACE_SPAN_RECEIVER_PREFIX); + enableTracing(hostname, service, spanReceivers, zookeepers, timeout, zkPath, properties); + } + + private static void enableTracing(String hostname, String service, String spanReceivers, String zookeepers, long timeout, String zkPath, + Map<String,String> properties) { + Configuration conf = new Configuration(false); + conf.set(Property.TRACE_SPAN_RECEIVERS.toString(), spanReceivers); + + // remaining properties will be parsed through an HTraceConfiguration by SpanReceivers + setProperty(conf, TRACER_ZK_HOST, zookeepers); + setProperty(conf, TRACER_ZK_TIMEOUT, (int) timeout); + setProperty(conf, TRACER_ZK_PATH, zkPath); + for (Entry<String,String> property : properties.entrySet()) { + setProperty(conf, property.getKey().substring(Property.TRACE_SPAN_RECEIVER_PREFIX.getKey().length()), property.getValue()); + } + if (hostname != null) { + setProperty(conf, TRACE_HOST_PROPERTY, hostname); + } + if (service != null) { + setProperty(conf, TRACE_SERVICE_PROPERTY, service); + } + org.htrace.Trace.setProcessId(service); + ShutdownHookManager.get().addShutdownHook(new Runnable() { + public void run() { + Trace.off(); + closeReceivers(); + } + }, 0); + loadSpanReceivers(conf); + } + + /** + * Disable tracing by closing SpanReceivers for the current process. + */ + public static void disable() { + closeReceivers(); + } + + private static synchronized void loadSpanReceivers(Configuration conf) { + if (!receivers.isEmpty()) { + log.info("Already loaded span receivers, enable tracing does not need to be called again"); + return; + } + Class<?> implClass = null; + String[] receiverNames = conf.getTrimmedStrings(Property.TRACE_SPAN_RECEIVERS.toString()); + if (receiverNames == null || receiverNames.length == 0) { + return; + } + for (String className : receiverNames) { + try { + implClass = Class.forName(className); + receivers.add(loadInstance(implClass, conf)); + log.info("SpanReceiver " + className + " was loaded successfully."); + } catch (ClassNotFoundException e) { + log.warn("Class " + className + " cannot be found.", e); + } catch (IOException e) { + log.warn("Load SpanReceiver " + className + " failed.", e); + } + } + for (SpanReceiver rcvr : receivers) { + org.htrace.Trace.addReceiver(rcvr); + } + } + + private static SpanReceiver loadInstance(Class<?> implClass, Configuration conf) throws IOException { + SpanReceiver impl; + try { + Object o = ReflectionUtils.newInstance(implClass, conf); + impl = (SpanReceiver)o; + impl.configure(wrapHadoopConf(conf)); + } catch (SecurityException e) { + throw new IOException(e); + } catch (IllegalArgumentException e) { + throw new IOException(e); + } catch (RuntimeException e) { + throw new IOException(e); + } + + return impl; + } + + private static void setProperty(Configuration conf, String key, String value) { + conf.set(HTRACE_CONF_PREFIX + key, value); + } + + private static void setProperty(Configuration conf, String key, int value) { + conf.setInt(HTRACE_CONF_PREFIX + key, value); + } + + private static HTraceConfiguration wrapHadoopConf(final Configuration conf) { + return new HTraceConfiguration() { + @Override + public String get(String key) { + return conf.get(HTRACE_CONF_PREFIX + key); + } + + @Override + public String get(String key, String defaultValue) { + return conf.get(HTRACE_CONF_PREFIX + key, defaultValue); + } + }; + } + + private static synchronized void closeReceivers() { + for (SpanReceiver rcvr : receivers) { try { - address = InetAddress.getLocalHost().getHostAddress().toString(); - } catch (UnknownHostException e) { - address = "unknown"; + rcvr.close(); + } catch (IOException e) { + log.warn("Unable to close SpanReceiver correctly: " + e.getMessage(), e); } } - Tracer.getInstance().addReceiver(new ZooTraceClient(zoo, path, address, application, 1000)); + receivers.clear(); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/core/src/main/java/org/apache/accumulo/core/trace/SendSpansViaThrift.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/trace/SendSpansViaThrift.java b/core/src/main/java/org/apache/accumulo/core/trace/SendSpansViaThrift.java new file mode 100644 index 0000000..87a9378 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/trace/SendSpansViaThrift.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.trace; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import org.apache.accumulo.trace.thrift.RemoteSpan; +import org.apache.accumulo.trace.thrift.SpanReceiver.Client; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; + +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.Map; + +/** + * Send Span data to a destination using thrift. + */ +public class SendSpansViaThrift extends AsyncSpanReceiver<String,Client> { + + private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(SendSpansViaThrift.class); + + private static final String THRIFT = "thrift://"; + + public SendSpansViaThrift() { + super(); + } + + public SendSpansViaThrift(long millis) { + super(millis); + } + + @Override + protected Client createDestination(String destination) throws Exception { + if (destination == null) + return null; + try { + int portSeparatorIndex = destination.lastIndexOf(':'); + String host = destination.substring(0, portSeparatorIndex); + int port = Integer.parseInt(destination.substring(portSeparatorIndex+1)); + log.debug("Connecting to " + host + ":" + port); + InetSocketAddress addr = new InetSocketAddress(host, port); + Socket sock = new Socket(); + sock.connect(addr); + TTransport transport = new TSocket(sock); + TProtocol prot = new TBinaryProtocol(transport); + return new Client(prot); + } catch (Exception ex) { + log.error(ex, ex); + return null; + } + } + + @Override + protected void send(Client client, RemoteSpan s) throws Exception { + if (client != null) { + try { + client.span(s); + } catch (Exception ex) { + client.getInputProtocol().getTransport().close(); + throw ex; + } + } + } + + private static final ByteBuffer DEST = ByteBuffer.wrap("dest".getBytes(UTF_8)); + + protected String getSpanKey(Map<ByteBuffer,ByteBuffer> data) { + String dest = new String(data.get(DEST).array()); + if (dest != null && dest.startsWith(THRIFT)) { + String hostAddress = dest.substring(THRIFT.length()); + String[] hostAddr = hostAddress.split(":", 2); + if (hostAddr.length == 2) { + return hostAddress; + } + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/core/src/main/java/org/apache/accumulo/core/trace/TraceDump.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/trace/TraceDump.java b/core/src/main/java/org/apache/accumulo/core/trace/TraceDump.java index b44cc3e..e3f9e5a 100644 --- a/core/src/main/java/org/apache/accumulo/core/trace/TraceDump.java +++ b/core/src/main/java/org/apache/accumulo/core/trace/TraceDump.java @@ -37,6 +37,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.hadoop.io.Text; +import org.htrace.Span; import com.beust.jcommander.Parameter; @@ -134,7 +135,7 @@ public class TraceDump { RemoteSpan span = TraceFormatter.getRemoteSpan(entry); tree.addNode(span); start = min(start, span.start); - if (span.parentId <= 0) + if (span.parentId == Span.ROOT_SPAN_ID) count++; } out.print(String.format("Trace started at %s", TraceFormatter.formatDate(new Date(start)))); http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/core/src/main/java/org/apache/accumulo/core/trace/TraceFormatter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/trace/TraceFormatter.java b/core/src/main/java/org/apache/accumulo/core/trace/TraceFormatter.java index 9d860d9..d6842df 100644 --- a/core/src/main/java/org/apache/accumulo/core/trace/TraceFormatter.java +++ b/core/src/main/java/org/apache/accumulo/core/trace/TraceFormatter.java @@ -16,11 +16,15 @@ */ package org.apache.accumulo.core.trace; +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.nio.ByteBuffer; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Iterator; import java.util.Map.Entry; +import org.apache.accumulo.trace.thrift.Annotation; import org.apache.accumulo.trace.thrift.RemoteSpan; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; @@ -88,8 +92,17 @@ public class TraceFormatter implements Formatter { result.append(String.format(" %12s:%s%n", "parent", Long.toHexString(span.parentId))); result.append(String.format(" %12s:%s%n", "start", dateFormatter.format(span.start))); result.append(String.format(" %12s:%s%n", "ms", span.stop - span.start)); - for (Entry<String,String> entry : span.data.entrySet()) { - result.append(String.format(" %12s:%s%n", entry.getKey(), entry.getValue())); + if (span.data != null) { + for (Entry<ByteBuffer, ByteBuffer> entry : span.data.entrySet()) { + String key = new String(entry.getKey().array(), entry.getKey().arrayOffset(), entry.getKey().limit(), UTF_8); + String value = new String(entry.getValue().array(), entry.getValue().arrayOffset(), entry.getValue().limit(), UTF_8); + result.append(String.format(" %12s:%s%n", key, value)); + } + } + if (span.annotations != null) { + for (Annotation annotation : span.annotations) { + result.append(String.format(" %12s:%s:%s%n", "annotation", annotation.getMsg(), dateFormatter.format(annotation.getTime()))); + } } if (printTimeStamps) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java b/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java index 9586eaa..f53f133 100644 --- a/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java +++ b/core/src/main/java/org/apache/accumulo/core/trace/ZooTraceClient.java @@ -18,57 +18,93 @@ package org.apache.accumulo.core.trace; import static java.nio.charset.StandardCharsets.UTF_8; -import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.fate.zookeeper.ZooReader; -import org.apache.accumulo.trace.instrument.receivers.SendSpansViaThrift; import org.apache.log4j.Logger; -import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; - +import org.htrace.HTraceConfiguration; /** * Find a Span collector via zookeeper and push spans there via Thrift RPC - * */ public class ZooTraceClient extends SendSpansViaThrift implements Watcher { - private static final Logger log = Logger.getLogger(ZooTraceClient.class); - - final ZooReader zoo; - final String path; + + private static final int DEFAULT_TIMEOUT = 30 * 1000; + + ZooReader zoo = null; + String path; + boolean pathExists = false; final Random random = new Random(); final List<String> hosts = new ArrayList<String>(); - - public ZooTraceClient(ZooReader zoo, String path, String host, String service, long millis) throws IOException, KeeperException, InterruptedException { - super(host, service, millis); - this.path = path; - this.zoo = zoo; - updateHosts(path, zoo.getChildren(path, this)); + + public ZooTraceClient() { + super(); } - + + public ZooTraceClient(long millis) { + super(millis); + } + @Override - synchronized protected String getSpanKey(Map<String,String> data) { + synchronized protected String getSpanKey(Map<ByteBuffer,ByteBuffer> data) { if (hosts.size() > 0) { - return hosts.get(random.nextInt(hosts.size())); + String host = hosts.get(random.nextInt(hosts.size())); + log.debug("sending data to " + host); + return host; } return null; } - + + @Override + public void configure(HTraceConfiguration conf) { + super.configure(conf); + String keepers = conf.get(DistributedTrace.TRACER_ZK_HOST); + if (keepers == null) + throw new IllegalArgumentException("Must configure " + DistributedTrace.TRACER_ZK_HOST); + int timeout = conf.getInt(DistributedTrace.TRACER_ZK_TIMEOUT, DEFAULT_TIMEOUT); + zoo = new ZooReader(keepers, timeout); + path = conf.get(DistributedTrace.TRACER_ZK_PATH, Constants.ZTRACERS); + process(null); + } + @Override public void process(WatchedEvent event) { + log.debug("Processing event for trace server zk watch"); try { - updateHosts(path, zoo.getChildren(path, null)); + if (pathExists || zoo.exists(path)) { + pathExists = true; + updateHosts(path, zoo.getChildren(path, this)); + } else { + zoo.exists(path, this); + } } catch (Exception ex) { log.error("unable to get destination hosts in zookeeper", ex); } } - + + @Override + protected void sendSpans() { + if (hosts.isEmpty()) { + if (!sendQueue.isEmpty()) { + log.error("No hosts to send data to, dropping queued spans"); + synchronized (sendQueue) { + sendQueue.clear(); + sendQueue.notifyAll(); + } + } + } else { + super.sendSpans(); + } + } + synchronized private void updateHosts(String path, List<String> children) { log.debug("Scanning trace hosts in zookeeper: " + path); try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java b/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java index da4e567..0edc884 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/ThriftUtil.java @@ -54,19 +54,18 @@ public class ThriftUtil { private static final Logger log = Logger.getLogger(ThriftUtil.class); public static class TraceProtocol extends TCompactProtocol { + private Span span = null; @Override public void writeMessageBegin(TMessage message) throws TException { - Trace.start("client:" + message.name); + span = Trace.start("client:" + message.name); super.writeMessageBegin(message); } @Override public void writeMessageEnd() throws TException { super.writeMessageEnd(); - Span currentTrace = Trace.currentTrace(); - if (currentTrace != null) - currentTrace.stop(); + span.stop(); } public TraceProtocol(TTransport transport) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/docs/src/main/asciidoc/chapters/administration.txt ---------------------------------------------------------------------- diff --git a/docs/src/main/asciidoc/chapters/administration.txt b/docs/src/main/asciidoc/chapters/administration.txt index d5e73f0..e9e0126 100644 --- a/docs/src/main/asciidoc/chapters/administration.txt +++ b/docs/src/main/asciidoc/chapters/administration.txt @@ -386,47 +386,115 @@ the following properties trace.user trace.token.property.password +Other tracer configuration properties include + + trace.port.client + trace.table + +==== Configuring Tracing +Traces are collected via SpanReceivers. The default SpanReceiver +configured is org.apache.accumulo.core.trace.ZooTraceClient, which +sends spans to an Accumulo Tracer process, as discussed in the +previous section. This default can be changed to a different span +receiver, or additional span receivers can be added in a +comma-separated list, by modifying the property + + trace.span.receivers + +Individual span receivers may require their own configuration +parameters, which are grouped under the trace.span.receiver.* +prefix. The ZooTraceClient requires the following property that +indicates where the tracer servers will register themselves in +ZooKeeper. + + trace.span.receiver.zookeeper.path + +This is configured to /tracers by default. If multiple Accumulo +instances are sharing the same ZooKeeper quorum, take care to +configure Accumulo with unique values for this property. + +Hadoop can also be configured to send traces to Accumulo, as of +Hadoop 2.6.0, by setting the following properties in Hadoop's +core-site.xml file (the path property is optional if left as the +default). + + <property> + <name>hadoop.htrace.spanreceiver.classes</name> + <value>org.apache.accumulo.core.trace.ZooTraceClient</value> + </property> + <property> + <name>hadoop.tracer.zookeeper.host</name> + <value>zookeeperHost:2181</value> + </property> + <property> + <name>hadoop.tracer.zookeeper.path</name> + <value>/tracers</value> + </property> + +The accumulo-core, accumulo-trace, and libthrift jars must also +be placed on Hadoop's classpath. + ==== Instrumenting a Client Tracing can be used to measure a client operation, such as a scan, as the operation traverses the distributed system. To enable tracing for your application call [source,java] -DistributedTrace.enable(instance, new ZooReader(instance), hostname, "myApplication"); +import org.apache.accumulo.core.trace.DistributedTrace; +... +DistributedTrace.enable(hostname, "myApplication"); +// do some tracing +... +DistributedTrace.disable(); Once tracing has been enabled, a client can wrap an operation in a trace. [source,java] -Trace.on("Client Scan"); +import org.htrace.Sampler; +import org.htrace.Trace; +import org.htrace.TraceScope; +... +TraceScope scope = Trace.startSpan("Client Scan", Sampler.ALWAYS); BatchScanner scanner = conn.createBatchScanner(...); // Configure your scanner for (Entry entry : scanner) { } -Trace.off(); +scope.close(); Additionally, the user can create additional Spans within a Trace. +The sampler for the trace should only be specified with the first span, and subsequent spans will be collected depending on whether that first span was sampled. [source,java] -Trace.on("Client Update"); +TraceScope scope = Trace.startSpan("Client Update", Sampler.ALWAYS); ... -Span readSpan = Trace.start("Read"); +TraceScope readScope = Trace.startSpan("Read"); ... -readSpan.stop(); +readScope.close(); ... -Span writeSpan = Trace.start("Write"); +TraceScope writeScope = Trace.startSpan("Write"); ... -writeSpan.stop(); -Trace.off(); +writeScope.close(); +scope.close(); Like Dapper, Accumulo tracing supports user defined annotations to associate additional data with a Trace. +Checking whether currently tracing is necessary when using a sampler other than Sampler.ALWAYS. [source,java] ... int numberOfEntriesRead = 0; -Span readSpan = Trace.start("Read"); +TraceScope readScope = Trace.startSpan("Read"); // Do the read, update the counter ... -readSpan.data("Number of Entries Read", String.valueOf(numberOfEntriesRead)); +if (Trace.isTracing) + readScope.getSpan().addKVAnnotation("Number of Entries Read".getBytes(StandardCharsets.UTF_8), + String.valueOf(numberOfEntriesRead).getBytes(StandardCharsets.UTF_8)); + +It is also possible to add timeline annotations to your spans. +This associates a string with a given timestamp between the start and stop times for a span. + +[source,java] +... +writeScope.getSpan().addTimelineAnnotation("Initiating Flush"); Some client operations may have a high volume within your application. As such, you may wish to only sample a percentage of @@ -434,18 +502,18 @@ operations for tracing. As seen below, the CountSampler can be used to help enable tracing for 1-in-1000 operations [source,java] +import org.htrace.impl.CountSampler; +... Sampler sampler = new CountSampler(1000); ... -if (sampler.next()) { - Trace.on("Read"); -} +TraceScope readScope = Trace.startSpan("Read", sampler); ... -Trace.offNoFlush(); +readScope.close(); + +Remember to close all spans and disable tracing when finished. -It should be noted that it is safe to turn off tracing even if it -isn't currently active. The +Trace.offNoFlush()+ should be used if the -user does not wish to have +Trace.off()+ block while flushing trace -data. +[source,java] +DistributedTrace.disable(); ==== Viewing Collected Traces To view collected traces, use the "Recent Traces" link on the Monitor http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/docs/src/main/resources/distributedTracing.html ---------------------------------------------------------------------- diff --git a/docs/src/main/resources/distributedTracing.html b/docs/src/main/resources/distributedTracing.html index 54c9095..98438da 100644 --- a/docs/src/main/resources/distributedTracing.html +++ b/docs/src/main/resources/distributedTracing.html @@ -30,13 +30,20 @@ distributed, and the typical lookup is fast.</p> <p>To provide insight into what accumulo is doing during your scan, you can turn on tracing before you do your operation:</p> <pre> - DistributedTrace.enable(instance, zooReader, hostname, "myApplication"); - Trace scanTrace = Trace.on("client:scan"); + import org.apache.accumulo.core.trace.DistributedTrace; + import org.htrace.Sampler; + import org.htrace.Trace; + import org.htrace.TraceScope; + ... + + DistributedTrace.enable(hostname, "myApplication"); + TraceScope scanTrace = Trace.startSpan("client:scan", Sampler.ALWAYS); BatchScanner scanner = conn.createBatchScanner(...); // Configure your scanner for (Entry<Key, Value> entry : scanner) { } - Trace.off(); + scanTrace.close(); + DistributedTrace.disableTracing(); </pre> http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/TracingExample.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/TracingExample.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/TracingExample.java index a542263..3a010a6 100644 --- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/TracingExample.java +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/TracingExample.java @@ -17,6 +17,7 @@ package org.apache.accumulo.examples.simple.client; +import static java.nio.charset.StandardCharsets.UTF_8; import java.util.Map.Entry; import org.apache.accumulo.core.cli.ClientOnDefaultTable; @@ -33,9 +34,9 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.trace.DistributedTrace; -import org.apache.accumulo.fate.zookeeper.ZooReader; -import org.apache.accumulo.trace.instrument.Span; -import org.apache.accumulo.trace.instrument.Trace; +import org.htrace.Sampler; +import org.htrace.Trace; +import org.htrace.TraceScope; import com.beust.jcommander.Parameter; @@ -64,7 +65,7 @@ public class TracingExample { } public void enableTracing(Opts opts) throws Exception { - DistributedTrace.enable(opts.getInstance(), new ZooReader(opts.getInstance().getZooKeepers(), 1000), "myHost", "myApp"); + DistributedTrace.enable("myHost", "myApp"); } public void execute(Opts opts) throws TableNotFoundException, InterruptedException, AccumuloException, AccumuloSecurityException, TableExistsException { @@ -91,22 +92,21 @@ public class TracingExample { // Trace the write operation. Note, unless you flush the BatchWriter, you will not capture // the write operation as it is occurs asynchronously. You can optionally create additional Spans // within a given Trace as seen below around the flush - Trace.on("Client Write"); + TraceScope scope = Trace.startSpan("Client Write", Sampler.ALWAYS); - System.out.println("TraceID: " + Long.toHexString(Trace.currentTrace().traceId())); + System.out.println("TraceID: " + Long.toHexString(scope.getSpan().getTraceId())); BatchWriter batchWriter = opts.getConnector().createBatchWriter(opts.getTableName(), new BatchWriterConfig()); Mutation m = new Mutation("row"); m.put("cf", "cq", "value"); batchWriter.addMutation(m); - Span flushSpan = Trace.start("Client Flush"); + // You can add timeline annotations to Spans which will be able to be viewed in the Monitor + scope.getSpan().addTimelineAnnotation("Initiating Flush"); batchWriter.flush(); - flushSpan.stop(); - // Use Trace.offNoFlush() if you don't want the operation to block. batchWriter.close(); - Trace.off(); + scope.close(); } private void readEntries(Opts opts) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { @@ -114,8 +114,8 @@ public class TracingExample { Scanner scanner = opts.getConnector().createScanner(opts.getTableName(), opts.auths); // Trace the read operation. - Span readSpan = Trace.on("Client Read"); - System.out.println("TraceID: " + Long.toHexString(Trace.currentTrace().traceId())); + TraceScope readScope = Trace.startSpan("Client Read", Sampler.ALWAYS); + System.out.println("TraceID: " + Long.toHexString(readScope.getSpan().getTraceId())); int numberOfEntriesRead = 0; for (Entry<Key,Value> entry : scanner) { @@ -123,9 +123,10 @@ public class TracingExample { ++numberOfEntriesRead; } // You can add additional metadata (key, values) to Spans which will be able to be viewed in the Monitor - readSpan.data("Number of Entries Read", String.valueOf(numberOfEntriesRead)); + readScope.getSpan().addKVAnnotation("Number of Entries Read".getBytes(UTF_8), + String.valueOf(numberOfEntriesRead).getBytes(UTF_8)); - Trace.off(); + readScope.close(); } public static void main(String[] args) throws Exception { http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloInstance.java ---------------------------------------------------------------------- diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloInstance.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloInstance.java index 54897cb..e0c93a6 100644 --- a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloInstance.java +++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloInstance.java @@ -50,7 +50,7 @@ public class MiniAccumuloInstance extends ZooKeeperInstance { } } - private static String getZooKeepersFromDir(File directory) throws FileNotFoundException { + public static String getZooKeepersFromDir(File directory) throws FileNotFoundException { if (!directory.isDirectory()) throw new IllegalArgumentException("Not a directory " + directory.getPath()); File configFile = new File(new File(directory, "conf"), "accumulo-site.xml"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ebc2f2f..e9338f4 100644 --- a/pom.xml +++ b/pom.xml @@ -124,6 +124,7 @@ <forkCount>1</forkCount> <!-- overwritten in profiles hadoop-1 or hadoop-2 --> <hadoop.version>2.2.0</hadoop.version> + <htrace.version>3.0.4</htrace.version> <httpclient.version>3.1</httpclient.version> <jetty.version>9.1.5.v20140505</jetty.version> <!-- the maven-release-plugin makes this recommendation, due to plugin bugs --> @@ -462,6 +463,11 @@ <version>${jetty.version}</version> </dependency> <dependency> + <groupId>org.htrace</groupId> + <artifactId>htrace-core</artifactId> + <version>${htrace.version}</version> + </dependency> + <dependency> <groupId>org.mortbay.jetty</groupId> <artifactId>jetty</artifactId> <version>6.1.26</version> http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java index 5c93a53..87c615e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java +++ b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java @@ -32,7 +32,6 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.trace.DistributedTrace; import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.Version; @@ -107,14 +106,6 @@ public class Accumulo { return ServerConstants.getInstanceIdLocation(v); } - public static void enableTracing(String address, String application) { - try { - DistributedTrace.enable(HdfsZooInstance.getInstance(), ZooReaderWriter.getInstance(), application, address); - } catch (Exception ex) { - log.error("creating remote sink for trace spans", ex); - } - } - /** * Finds the best log4j configuration file. A generic file is used only if an * application-specific file is not available. An XML file is preferred over http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java index 24ff637..a15e05e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java @@ -477,7 +477,6 @@ public class Initialize { zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_WALOGS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_PATH, rootTabletDir.getBytes(UTF_8), NodeExistsPolicy.FAIL); - zoo.putPersistentData(zkInstanceRoot + Constants.ZTRACERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_GOAL_STATE, MasterGoalState.NORMAL.toString().getBytes(UTF_8), NodeExistsPolicy.FAIL); http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/server/base/src/main/java/org/apache/accumulo/server/trace/TraceFSDataInputStream.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/trace/TraceFSDataInputStream.java b/server/base/src/main/java/org/apache/accumulo/server/trace/TraceFSDataInputStream.java deleted file mode 100644 index 5162e01..0000000 --- a/server/base/src/main/java/org/apache/accumulo/server/trace/TraceFSDataInputStream.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.trace; - -import java.io.IOException; - -import org.apache.accumulo.trace.instrument.Span; -import org.apache.accumulo.trace.instrument.Trace; -import org.apache.hadoop.fs.FSDataInputStream; - - -public class TraceFSDataInputStream extends FSDataInputStream { - @Override - public synchronized void seek(long desired) throws IOException { - Span span = Trace.start("FSDataInputStream.seek"); - try { - impl.seek(desired); - } finally { - span.stop(); - } - } - - @Override - public int read(long position, byte[] buffer, int offset, int length) throws IOException { - Span span = Trace.start("FSDataInputStream.read"); - if (Trace.isTracing()) - span.data("length", Integer.toString(length)); - try { - return impl.read(position, buffer, offset, length); - } finally { - span.stop(); - } - } - - @Override - public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { - Span span = Trace.start("FSDataInputStream.readFully"); - if (Trace.isTracing()) - span.data("length", Integer.toString(length)); - try { - impl.readFully(position, buffer, offset, length); - } finally { - span.stop(); - } - } - - @Override - public void readFully(long position, byte[] buffer) throws IOException { - Span span = Trace.start("FSDataInputStream.readFully"); - if (Trace.isTracing()) - span.data("length", Integer.toString(buffer.length)); - try { - impl.readFully(position, buffer); - } finally { - span.stop(); - } - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - Span span = Trace.start("FSDataInputStream.seekToNewSource"); - try { - return impl.seekToNewSource(targetPos); - } finally { - span.stop(); - } - } - - private final FSDataInputStream impl; - - public TraceFSDataInputStream(FSDataInputStream in) throws IOException { - super(in); - impl = in; - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/server/base/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java b/server/base/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java deleted file mode 100644 index d3fbad7..0000000 --- a/server/base/src/main/java/org/apache/accumulo/server/trace/TraceFileSystem.java +++ /dev/null @@ -1,818 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.trace; - -import static com.google.common.base.Preconditions.checkArgument; -import java.io.IOException; -import java.net.URI; - -import org.apache.accumulo.trace.instrument.Span; -import org.apache.accumulo.trace.instrument.Trace; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileChecksum; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.util.Progressable; - -// If FileSystem was an interface, we could use a Proxy, but it's not, so we have to override everything manually - -public class TraceFileSystem extends FileSystem { - - @Override - public void setConf(Configuration conf) { - Span span = Trace.start("setConf"); - try { - if (impl != null) - impl.setConf(conf); - else - super.setConf(conf); - } finally { - span.stop(); - } - } - - @Override - public Configuration getConf() { - Span span = Trace.start("getConf"); - try { - return impl.getConf(); - } finally { - span.stop(); - } - } - - @Override - public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { - Span span = Trace.start("getFileBlockLocations"); - try { - return impl.getFileBlockLocations(file, start, len); - } finally { - span.stop(); - } - } - - @Override - public FSDataInputStream open(Path f) throws IOException { - Span span = Trace.start("open"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - return new TraceFSDataInputStream(impl.open(f)); - } finally { - span.stop(); - } - } - - @Override - public FSDataOutputStream create(Path f) throws IOException { - Span span = Trace.start("create"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - return impl.create(f); - } finally { - span.stop(); - } - } - - @Override - public FSDataOutputStream create(Path f, boolean overwrite) throws IOException { - Span span = Trace.start("create"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - return impl.create(f, overwrite); - } finally { - span.stop(); - } - } - - @Override - public FSDataOutputStream create(Path f, Progressable progress) throws IOException { - Span span = Trace.start("create"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - - return impl.create(f, progress); - } finally { - span.stop(); - } - } - - @Override - public FSDataOutputStream create(Path f, short replication) throws IOException { - Span span = Trace.start("create"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - return impl.create(f, replication); - } finally { - span.stop(); - } - } - - @Override - public FSDataOutputStream create(Path f, short replication, Progressable progress) throws IOException { - Span span = Trace.start("create"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - return impl.create(f, replication, progress); - } finally { - span.stop(); - } - } - - @Override - public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException { - Span span = Trace.start("create"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - return impl.create(f, overwrite, bufferSize); - } finally { - span.stop(); - } - } - - @Override - public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, Progressable progress) throws IOException { - Span span = Trace.start("create"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - return impl.create(f, overwrite, bufferSize, progress); - } finally { - span.stop(); - } - } - - @Override - public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException { - Span span = Trace.start("create"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - return impl.create(f, overwrite, bufferSize, replication, blockSize); - } finally { - span.stop(); - } - } - - @Override - public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { - Span span = Trace.start("create"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - return impl.create(f, overwrite, bufferSize, replication, blockSize, progress); - } finally { - span.stop(); - } - } - - @Override - public boolean createNewFile(Path f) throws IOException { - Span span = Trace.start("createNewFile"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - return impl.createNewFile(f); - } finally { - span.stop(); - } - } - - @Override - public FSDataOutputStream append(Path f) throws IOException { - Span span = Trace.start("append"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - return impl.append(f); - } finally { - span.stop(); - } - } - - @Override - public FSDataOutputStream append(Path f, int bufferSize) throws IOException { - Span span = Trace.start("append"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - return impl.append(f, bufferSize); - } finally { - span.stop(); - } - } - - @Deprecated - @Override - public short getReplication(Path src) throws IOException { - Span span = Trace.start("getReplication"); - if (Trace.isTracing()) - span.data("path", src.toString()); - try { - return impl.getFileStatus(src).getReplication(); - } finally { - span.stop(); - } - } - - @Override - public boolean setReplication(Path src, short replication) throws IOException { - Span span = Trace.start("setReplication"); - if (Trace.isTracing()) - span.data("path", src.toString()); - try { - return impl.setReplication(src, replication); - } finally { - span.stop(); - } - } - - @Override - public boolean exists(Path f) throws IOException { - Span span = Trace.start("exists"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - return impl.exists(f); - } finally { - span.stop(); - } - } - - @Deprecated - @Override - public boolean isDirectory(Path f) throws IOException { - Span span = Trace.start("isDirectory"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - return impl.getFileStatus(f).isDir(); - } finally { - span.stop(); - } - } - - @Override - public boolean isFile(Path f) throws IOException { - Span span = Trace.start("isFile"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - return impl.isFile(f); - } finally { - span.stop(); - } - } - - @SuppressWarnings("deprecation") - @Override - public long getLength(Path f) throws IOException { - Span span = Trace.start("getLength"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - return impl.getLength(f); - } finally { - span.stop(); - } - } - - @Override - public ContentSummary getContentSummary(Path f) throws IOException { - Span span = Trace.start("getContentSummary"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - return impl.getContentSummary(f); - } finally { - span.stop(); - } - } - - @Override - public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException { - Span span = Trace.start("listStatus"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - return impl.listStatus(f, filter); - } finally { - span.stop(); - } - } - - @Override - public FileStatus[] listStatus(Path[] files) throws IOException { - Span span = Trace.start("listStatus"); - try { - return impl.listStatus(files); - } finally { - span.stop(); - } - } - - @Override - public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException { - Span span = Trace.start("listStatus"); - try { - return impl.listStatus(files, filter); - } finally { - span.stop(); - } - } - - @Override - public FileStatus[] globStatus(Path pathPattern) throws IOException { - Span span = Trace.start("globStatus"); - if (Trace.isTracing()) - span.data("pattern", pathPattern.toString()); - try { - return impl.globStatus(pathPattern); - } finally { - span.stop(); - } - } - - @Override - public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException { - Span span = Trace.start("globStatus"); - if (Trace.isTracing()) - span.data("pattern", pathPattern.toString()); - try { - return impl.globStatus(pathPattern, filter); - } finally { - span.stop(); - } - } - - @Override - public Path getHomeDirectory() { - Span span = Trace.start("getHomeDirectory"); - try { - return impl.getHomeDirectory(); - } finally { - span.stop(); - } - } - - @Override - public boolean mkdirs(Path f) throws IOException { - Span span = Trace.start("mkdirs"); - if (Trace.isTracing()) - span.data("path", f.toString()); - try { - return impl.mkdirs(f); - } finally { - span.stop(); - } - } - - @Override - public void copyFromLocalFile(Path src, Path dst) throws IOException { - Span span = Trace.start("copyFromLocalFile"); - if (Trace.isTracing()) { - span.data("src", src.toString()); - span.data("dst", dst.toString()); - } - try { - impl.copyFromLocalFile(src, dst); - } finally { - span.stop(); - } - } - - @Override - public void moveFromLocalFile(Path[] srcs, Path dst) throws IOException { - Span span = Trace.start("moveFromLocalFile"); - if (Trace.isTracing()) { - span.data("dst", dst.toString()); - } - try { - impl.moveFromLocalFile(srcs, dst); - } finally { - span.stop(); - } - } - - @Override - public void moveFromLocalFile(Path src, Path dst) throws IOException { - Span span = Trace.start("moveFromLocalFile"); - if (Trace.isTracing()) { - span.data("src", src.toString()); - span.data("dst", dst.toString()); - } - try { - impl.moveFromLocalFile(src, dst); - } finally { - span.stop(); - } - } - - @Override - public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException { - Span span = Trace.start("copyFromLocalFile"); - if (Trace.isTracing()) { - span.data("src", src.toString()); - span.data("dst", dst.toString()); - } - try { - impl.copyFromLocalFile(delSrc, src, dst); - } finally { - span.stop(); - } - } - - @Override - public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path[] srcs, Path dst) throws IOException { - Span span = Trace.start("copyFromLocalFile"); - if (Trace.isTracing()) { - span.data("dst", dst.toString()); - } - try { - impl.copyFromLocalFile(delSrc, overwrite, srcs, dst); - } finally { - span.stop(); - } - } - - @Override - public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException { - Span span = Trace.start("copyFromLocalFile"); - if (Trace.isTracing()) { - span.data("src", src.toString()); - span.data("dst", dst.toString()); - } - try { - impl.copyFromLocalFile(delSrc, overwrite, src, dst); - } finally { - span.stop(); - } - } - - @Override - public void copyToLocalFile(Path src, Path dst) throws IOException { - Span span = Trace.start("copyFromLocalFile"); - if (Trace.isTracing()) { - span.data("src", src.toString()); - span.data("dst", dst.toString()); - } - try { - impl.copyToLocalFile(src, dst); - } finally { - span.stop(); - } - } - - @Override - public void moveToLocalFile(Path src, Path dst) throws IOException { - Span span = Trace.start("moveToLocalFile"); - if (Trace.isTracing()) { - span.data("src", src.toString()); - span.data("dst", dst.toString()); - } - try { - impl.moveToLocalFile(src, dst); - } finally { - span.stop(); - } - } - - @Override - public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException { - Span span = Trace.start("copyToLocalFile"); - if (Trace.isTracing()) { - span.data("src", src.toString()); - span.data("dst", dst.toString()); - } - try { - impl.copyToLocalFile(delSrc, src, dst); - } finally { - span.stop(); - } - } - - @Override - public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) throws IOException { - Span span = Trace.start("startLocalOutput"); - if (Trace.isTracing()) { - span.data("out", fsOutputFile.toString()); - span.data("local", tmpLocalFile.toString()); - } - try { - return impl.startLocalOutput(fsOutputFile, tmpLocalFile); - } finally { - span.stop(); - } - } - - @Override - public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) throws IOException { - Span span = Trace.start("completeLocalOutput"); - if (Trace.isTracing()) { - span.data("out", fsOutputFile.toString()); - span.data("local", tmpLocalFile.toString()); - } - try { - impl.completeLocalOutput(fsOutputFile, tmpLocalFile); - } finally { - span.stop(); - } - } - - @Override - public void close() throws IOException { - Span span = Trace.start("close"); - try { - impl.close(); - } finally { - span.stop(); - } - } - - @Override - public long getUsed() throws IOException { - Span span = Trace.start("getUsed"); - try { - return impl.getUsed(); - } finally { - span.stop(); - } - } - - @SuppressWarnings("deprecation") - @Override - public long getBlockSize(Path f) throws IOException { - Span span = Trace.start("getBlockSize"); - if (Trace.isTracing()) { - span.data("path", f.toString()); - } - try { - return impl.getBlockSize(f); - } finally { - span.stop(); - } - } - - @Deprecated - @Override - public long getDefaultBlockSize() { - Span span = Trace.start("getDefaultBlockSize"); - try { - return impl.getDefaultBlockSize(); - } finally { - span.stop(); - } - } - - @Deprecated - @Override - public short getDefaultReplication() { - Span span = Trace.start("getDefaultReplication"); - try { - return impl.getDefaultReplication(); - } finally { - span.stop(); - } - } - - @Override - public FileChecksum getFileChecksum(Path f) throws IOException { - Span span = Trace.start("getFileChecksum"); - if (Trace.isTracing()) { - span.data("path", f.toString()); - } - try { - return impl.getFileChecksum(f); - } finally { - span.stop(); - } - } - - @Override - public void setVerifyChecksum(boolean verifyChecksum) { - Span span = Trace.start("setVerifyChecksum"); - try { - impl.setVerifyChecksum(verifyChecksum); - } finally { - span.stop(); - } - } - - @Override - public void setPermission(Path p, FsPermission permission) throws IOException { - Span span = Trace.start("setPermission"); - if (Trace.isTracing()) { - span.data("path", p.toString()); - } - try { - impl.setPermission(p, permission); - } finally { - span.stop(); - } - } - - @Override - public void setOwner(Path p, String username, String groupname) throws IOException { - Span span = Trace.start("setOwner"); - if (Trace.isTracing()) { - span.data("path", p.toString()); - span.data("user", username); - span.data("group", groupname); - } - - try { - impl.setOwner(p, username, groupname); - } finally { - span.stop(); - } - } - - @Override - public void setTimes(Path p, long mtime, long atime) throws IOException { - Span span = Trace.start("setTimes"); - try { - impl.setTimes(p, mtime, atime); - } finally { - span.stop(); - } - } - - final FileSystem impl; - - TraceFileSystem(FileSystem impl) { - checkArgument(impl != null, "impl is null"); - this.impl = impl; - } - - public FileSystem getImplementation() { - return impl; - } - - @Override - public URI getUri() { - Span span = Trace.start("getUri"); - try { - return impl.getUri(); - } finally { - span.stop(); - } - } - - @Override - public FSDataInputStream open(Path f, int bufferSize) throws IOException { - Span span = Trace.start("open"); - try { - return new TraceFSDataInputStream(impl.open(f, bufferSize)); - } finally { - span.stop(); - } - } - - @Override - public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) - throws IOException { - Span span = Trace.start("create"); - try { - return impl.create(f, overwrite, bufferSize, replication, blockSize, progress); - } finally { - span.stop(); - } - } - - @Override - public void initialize(URI name, Configuration conf) throws IOException { - Span span = Trace.start("initialize"); - try { - impl.initialize(name, conf); - } finally { - span.stop(); - } - } - - @Override - public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { - Span span = Trace.start("append"); - try { - return impl.append(f, bufferSize, progress); - } finally { - span.stop(); - } - } - - @Override - public boolean rename(Path src, Path dst) throws IOException { - Span span = Trace.start("rename"); - try { - return impl.rename(src, dst); - } finally { - span.stop(); - } - } - - @SuppressWarnings("deprecation") - @Override - public boolean delete(Path f) throws IOException { - Span span = Trace.start("delete"); - try { - return impl.delete(f); - } finally { - span.stop(); - } - } - - @Override - public boolean delete(Path f, boolean recursive) throws IOException { - Span span = Trace.start("delete"); - try { - return impl.delete(f, recursive); - } finally { - span.stop(); - } - } - - @Override - public FileStatus[] listStatus(Path f) throws IOException { - Span span = Trace.start("listStatus"); - try { - return impl.listStatus(f); - } finally { - span.stop(); - } - } - - @Override - public void setWorkingDirectory(Path new_dir) { - Span span = Trace.start("setWorkingDirectory"); - try { - impl.setWorkingDirectory(new_dir); - } finally { - span.stop(); - } - } - - @Override - public Path getWorkingDirectory() { - Span span = Trace.start("getWorkingDirectory"); - try { - return impl.getWorkingDirectory(); - } finally { - span.stop(); - } - } - - @Override - public boolean mkdirs(Path f, FsPermission permission) throws IOException { - Span span = Trace.start("mkdirs"); - try { - return impl.mkdirs(f, permission); - } finally { - span.stop(); - } - } - - @Override - public FileStatus getFileStatus(Path f) throws IOException { - Span span = Trace.start("getFileStatus"); - try { - return impl.getFileStatus(f); - } finally { - span.stop(); - } - } - - public static FileSystem wrap(FileSystem fileSystem) { - return new TraceFileSystem(fileSystem); - } - - public static FileSystem getAndWrap(Configuration conf) throws IOException { - return wrap(FileSystem.get(conf)); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java b/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java index 6c7fd47..7bd5f6d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java @@ -50,7 +50,8 @@ public class AccumuloStatus { if (!reader.getChildren(rootPath + Constants.ZTSERVERS + "/" + child).isEmpty()) return false; } - if (!reader.getChildren(rootPath + Constants.ZTRACERS).isEmpty()) + // TODO: check configured tracers location instead of default + if (!reader.getChildren(Constants.ZTRACERS).isEmpty()) return false; if (!reader.getChildren(rootPath + Constants.ZMASTER_LOCK).isEmpty()) return false; http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java index 1f59531..7fdbf13 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java @@ -19,6 +19,7 @@ package org.apache.accumulo.server.util; import java.util.List; import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; import org.apache.accumulo.server.cli.ClientOpts; import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -48,6 +49,10 @@ public class ZooZap { boolean zapTracers = false; @Parameter(names="-verbose", description="print out messages about progress") boolean verbose = false; + + String getTraceZKPath() { + return super.getClientConfiguration().get(ClientProperty.TRACE_ZK_PATH); + } } public static void main(String[] args) { @@ -93,7 +98,7 @@ public class ZooZap { } if (opts.zapTracers) { - String path = Constants.ZROOT + "/" + iid + Constants.ZTRACERS; + String path = opts.getTraceZKPath(); zapDirectory(zoo, path); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 01fd2c8..720d18b 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -69,6 +69,7 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.security.SecurityUtil; import org.apache.accumulo.core.security.thrift.TCredentials; +import org.apache.accumulo.core.trace.DistributedTrace; import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.ServerServices; @@ -97,7 +98,6 @@ import org.apache.accumulo.server.util.TServerUtils; import org.apache.accumulo.server.util.TabletIterator; import org.apache.accumulo.server.zookeeper.ZooLock; import org.apache.accumulo.trace.instrument.CountSampler; -import org.apache.accumulo.trace.instrument.Sampler; import org.apache.accumulo.trace.instrument.Span; import org.apache.accumulo.trace.instrument.Trace; import org.apache.accumulo.trace.thrift.TInfo; @@ -158,8 +158,12 @@ public class SimpleGarbageCollector implements Iface { AccumuloConfiguration config = conf.getConfiguration(); gc.init(fs, instance, SystemCredentials.get(), config); - Accumulo.enableTracing(opts.getAddress(), app); - gc.run(); + DistributedTrace.enable(opts.getAddress(), app, config); + try { + gc.run(); + } finally { + DistributedTrace.disable(); + } } /** @@ -568,11 +572,10 @@ public class SimpleGarbageCollector implements Iface { return; } - Sampler sampler = new CountSampler(100); + CountSampler sampler = new CountSampler(100); while (true) { - if (sampler.next()) - Trace.on("gc"); + Trace.on("gc", sampler); Span gcSpan = Trace.start("loop"); tStart = System.currentTimeMillis(); @@ -634,7 +637,7 @@ public class SimpleGarbageCollector implements Iface { log.warn(e, e); } - Trace.offNoFlush(); + Trace.off(); try { long gcDelay = config.getTimeInMillis(Property.GC_CYCLE_DELAY); log.debug("Sleeping for " + gcDelay + " milliseconds"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index b6b96a0..bbd2396 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -69,6 +69,7 @@ import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.security.NamespacePermission; import org.apache.accumulo.core.security.SecurityUtil; import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.trace.DistributedTrace; import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.UtilWaitThread; @@ -1259,11 +1260,13 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt VolumeManager fs = VolumeManagerImpl.get(); Accumulo.init(fs, conf, app); Master master = new Master(conf, fs, hostname); - Accumulo.enableTracing(hostname, app); + DistributedTrace.enable(hostname, app, conf.getConfiguration()); master.run(); } catch (Exception ex) { log.error("Unexpected exception, exiting", ex); System.exit(1); + } finally { + DistributedTrace.disable(); } }
