http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java index a52f743..e3bbafa 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java @@ -25,7 +25,6 @@ import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.master.Master; import org.apache.accumulo.trace.instrument.CountSampler; -import org.apache.accumulo.trace.instrument.Sampler; import org.apache.accumulo.trace.instrument.Trace; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +53,7 @@ public class ReplicationDriver extends Daemon { @Override public void run() { - Sampler sampler = new CountSampler(10); + CountSampler sampler = new CountSampler(10); while (master.stillMaster()) { if (null == workMaker) { @@ -73,9 +72,7 @@ public class ReplicationDriver extends Daemon { rcrr = new RemoveCompleteReplicationRecords(conn); } - if (sampler.next()) { - Trace.on("masterReplicationDriver"); - } + Trace.on("masterReplicationDriver", sampler); // Make status markers from replication records in metadata, removing entries in // metadata which are no longer needed (closed records) @@ -109,7 +106,7 @@ public class ReplicationDriver extends Daemon { log.error("Caught Exception trying to remove finished Replication records", e); } - Trace.offNoFlush(); + Trace.off(); // Sleep for a bit long sleepMillis = conf.getTimeInMillis(Property.MASTER_REPLICATION_SCAN_INTERVAL);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 49bb56d..7fe1af7 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -54,6 +54,7 @@ import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.trace.DistributedTrace; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; @@ -413,8 +414,12 @@ public class Monitor { config = new ServerConfigurationFactory(instance); Accumulo.init(fs, config, app); Monitor monitor = new Monitor(); - Accumulo.enableTracing(hostname, app); - monitor.run(hostname); + DistributedTrace.enable(hostname, app, config.getConfiguration()); + try { + monitor.run(hostname); + } finally { + DistributedTrace.disable(); + } } private static long START_TIME; http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/ShowTrace.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/ShowTrace.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/ShowTrace.java index a476201..8968088 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/ShowTrace.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/trace/ShowTrace.java @@ -17,7 +17,9 @@ package org.apache.accumulo.monitor.servlets.trace; import static java.lang.Math.min; +import static java.nio.charset.StandardCharsets.UTF_8; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.Map.Entry; import java.util.Set; @@ -34,6 +36,7 @@ import org.apache.accumulo.core.trace.SpanTreeVisitor; import org.apache.accumulo.core.trace.TraceDump; import org.apache.accumulo.core.trace.TraceFormatter; import org.apache.accumulo.monitor.servlets.BasicServlet; +import org.apache.accumulo.trace.thrift.Annotation; import org.apache.accumulo.trace.thrift.RemoteSpan; import org.apache.hadoop.io.Text; @@ -115,7 +118,8 @@ public class ShowTrace extends Basic { sb.append(String.format("<td style='text-indent: %dpx'>%s@%s</td>%n", level * 5, node.svc, node.sender)); sb.append("<td>" + node.description + "</td>"); boolean hasData = node.data != null && !node.data.isEmpty(); - if (hasData) { + boolean hasAnnotations = node.annotations != null && !node.annotations.isEmpty(); + if (hasData || hasAnnotations) { String hexSpanId = Long.toHexString(node.spanId); sb.append("<td><input type='checkbox' id=\""); sb.append(hexSpanId); @@ -127,11 +131,23 @@ public class ShowTrace extends Basic { sb.append("</tr>\n"); sb.append("<tr id='" + Long.toHexString(node.spanId) + "' style='display:none'>"); sb.append("<td colspan='5'>\n"); - if (hasData) { + if (hasData || hasAnnotations) { sb.append(" <table class='indent,noborder'>\n"); - for (Entry<String,String> entry : node.data.entrySet()) { - sb.append(" <tr><td>" + BasicServlet.sanitize(entry.getKey()) + "</td>"); - sb.append("<td>" + BasicServlet.sanitize(entry.getValue()) + "</td></tr>\n"); + if (hasData) { + sb.append(" <tr><th>Key</th><th>Value</th></tr>\n"); + for (Entry<ByteBuffer, ByteBuffer> entry : node.data.entrySet()) { + String key = new String(entry.getKey().array(), entry.getKey().arrayOffset(), entry.getKey().limit(), UTF_8); + String value = new String(entry.getValue().array(), entry.getValue().arrayOffset(), entry.getValue().limit(), UTF_8); + sb.append(" <tr><td>" + BasicServlet.sanitize(key) + "</td>"); + sb.append("<td>" + BasicServlet.sanitize(value) + "</td></tr>\n"); + } + } + if (hasAnnotations) { + sb.append(" <tr><th>Annotation</th><th>Time Offset</th></tr>\n"); + for (Annotation entry : node.annotations) { + sb.append(" <tr><td>" + BasicServlet.sanitize(entry.getMsg()) + "</td>"); + sb.append(String.format("<td>%d</td></tr>\n", entry.getTime() - finalStart)); + } } sb.append(" </table>"); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/server/monitor/src/test/java/org/apache/accumulo/monitor/ShowTraceLinkTypeTest.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/test/java/org/apache/accumulo/monitor/ShowTraceLinkTypeTest.java b/server/monitor/src/test/java/org/apache/accumulo/monitor/ShowTraceLinkTypeTest.java index a630434..effb0e6 100644 --- a/server/monitor/src/test/java/org/apache/accumulo/monitor/ShowTraceLinkTypeTest.java +++ b/server/monitor/src/test/java/org/apache/accumulo/monitor/ShowTraceLinkTypeTest.java @@ -20,45 +20,47 @@ import java.util.ArrayList; import java.util.Collections; import org.apache.accumulo.trace.thrift.RemoteSpan; +import org.apache.accumulo.trace.thrift.Annotation; import org.junit.Assert; import org.junit.Test; +import java.nio.ByteBuffer; + public class ShowTraceLinkTypeTest { - + private static RemoteSpan rs(long start, long stop, String description) { + return new RemoteSpan("sender", "svc", 0l, 0l, 0l, start, stop, description, Collections.<ByteBuffer, ByteBuffer>emptyMap(), Collections.<Annotation>emptyList()); + } + @Test public void testTraceSortingForMonitor() { - /* - * public RemoteSpan(String sender, String svc, long traceId, long spanId, long parentId, long start, long stop, String description, Map<String,String> - * data) - */ ArrayList<RemoteSpan> spans = new ArrayList<RemoteSpan>(10), expectedOrdering = new ArrayList<RemoteSpan>(10); - + // "Random" ordering - spans.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 55l, 75l, "desc5", Collections.<String,String> emptyMap())); - spans.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 25l, 30l, "desc2", Collections.<String,String> emptyMap())); - spans.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 85l, 90l, "desc8", Collections.<String,String> emptyMap())); - spans.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 45l, 60l, "desc4", Collections.<String,String> emptyMap())); - spans.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 35l, 55l, "desc3", Collections.<String,String> emptyMap())); - spans.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 95l, 110l, "desc9", Collections.<String,String> emptyMap())); - spans.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 65l, 80l, "desc6", Collections.<String,String> emptyMap())); - spans.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 100l, 120l, "desc10", Collections.<String,String> emptyMap())); - spans.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 15l, 25l, "desc1", Collections.<String,String> emptyMap())); - spans.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 75l, 100l, "desc7", Collections.<String,String> emptyMap())); - + spans.add(rs(55l, 75l, "desc5")); + spans.add(rs(25l, 30l, "desc2")); + spans.add(rs(85l, 90l, "desc8")); + spans.add(rs(45l, 60l, "desc4")); + spans.add(rs(35l, 55l, "desc3")); + spans.add(rs(95l, 110l, "desc9")); + spans.add(rs(65l, 80l, "desc6")); + spans.add(rs(100l, 120l, "desc10")); + spans.add(rs(15l, 25l, "desc1")); + spans.add(rs(75l, 100l, "desc7")); + // We expect them to be sorted by 'start' - expectedOrdering.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 15l, 25l, "desc1", Collections.<String,String> emptyMap())); - expectedOrdering.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 25l, 30l, "desc2", Collections.<String,String> emptyMap())); - expectedOrdering.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 35l, 55l, "desc3", Collections.<String,String> emptyMap())); - expectedOrdering.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 45l, 60l, "desc4", Collections.<String,String> emptyMap())); - expectedOrdering.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 55l, 75l, "desc5", Collections.<String,String> emptyMap())); - expectedOrdering.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 65l, 80l, "desc6", Collections.<String,String> emptyMap())); - expectedOrdering.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 75l, 100l, "desc7", Collections.<String,String> emptyMap())); - expectedOrdering.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 85l, 90l, "desc8", Collections.<String,String> emptyMap())); - expectedOrdering.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 95l, 110l, "desc9", Collections.<String,String> emptyMap())); - expectedOrdering.add(new RemoteSpan("sender", "svc", 0l, 0l, 0l, 100l, 120l, "desc10", Collections.<String,String> emptyMap())); - + expectedOrdering.add(rs(15l, 25l, "desc1")); + expectedOrdering.add(rs(25l, 30l, "desc2")); + expectedOrdering.add(rs(35l, 55l, "desc3")); + expectedOrdering.add(rs(45l, 60l, "desc4")); + expectedOrdering.add(rs(55l, 75l, "desc5")); + expectedOrdering.add(rs(65l, 80l, "desc6")); + expectedOrdering.add(rs(75l, 100l, "desc7")); + expectedOrdering.add(rs(85l, 90l, "desc8")); + expectedOrdering.add(rs(95l, 110l, "desc9")); + expectedOrdering.add(rs(100l, 120l, "desc10")); + Collections.sort(spans); - + Assert.assertEquals(expectedOrdering, spans); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java ---------------------------------------------------------------------- diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java index 4858b8a..af1ec56 100644 --- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java +++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java @@ -26,7 +26,6 @@ import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; @@ -43,10 +42,10 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.user.AgeOffFilter; import org.apache.accumulo.core.security.SecurityUtil; -import org.apache.accumulo.core.trace.TraceFormatter; import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.core.trace.TraceFormatter; import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.server.Accumulo; import org.apache.accumulo.server.ServerOpts; import org.apache.accumulo.server.client.HdfsZooInstance; @@ -225,7 +224,7 @@ public class TraceServer implements Watcher { TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport); options.processor(new Processor<Iface>(new Receiver())); server = new TThreadPoolServer(options); - registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort()); + registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort(), conf.get(Property.TRACE_ZK_PATH)); writer = new AtomicReference<BatchWriter>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS))); } @@ -278,9 +277,10 @@ public class TraceServer implements Watcher { } } - private void registerInZooKeeper(String name) throws Exception { - String root = ZooUtil.getRoot(serverConfiguration.getInstance()) + Constants.ZTRACERS; + private void registerInZooKeeper(String name, String root) throws Exception { IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + zoo.putPersistentData(root, new byte[0], NodeExistsPolicy.SKIP); + log.info("Registering tracer " + name + " at " + root); String path = zoo.putEphemeralSequential(root + "/trace-", name.getBytes(UTF_8)); zoo.exists(path, this); } @@ -297,9 +297,12 @@ public class TraceServer implements Watcher { Accumulo.init(fs, conf, app); String hostname = opts.getAddress(); TraceServer server = new TraceServer(conf, hostname); - Accumulo.enableTracing(hostname, app); - server.run(); - log.info("tracer stopping"); + try { + server.run(); + } finally { + log.info("tracer stopping"); + ZooReaderWriter.getInstance().getZooKeeper().close(); + } } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java index f7bda49..034becd 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/BulkFailedCopyProcessor.java @@ -23,7 +23,6 @@ import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManagerImpl; -import org.apache.accumulo.server.trace.TraceFileSystem; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -53,8 +52,8 @@ public class BulkFailedCopyProcessor implements Processor { try { VolumeManager vm = VolumeManagerImpl.get(SiteConfiguration.getInstance()); - FileSystem origFs = TraceFileSystem.wrap(vm.getVolumeByPath(orig).getFileSystem()); - FileSystem destFs = TraceFileSystem.wrap(vm.getVolumeByPath(dest).getFileSystem()); + FileSystem origFs = vm.getVolumeByPath(orig).getFileSystem(); + FileSystem destFs = vm.getVolumeByPath(dest).getFileSystem(); FileUtil.copy(origFs, orig, destFs, tmp, false, true, CachedConfiguration.getInstance()); destFs.rename(tmp, dest); @@ -62,7 +61,7 @@ public class BulkFailedCopyProcessor implements Processor { } catch (IOException ex) { try { VolumeManager vm = VolumeManagerImpl.get(SiteConfiguration.getInstance()); - FileSystem destFs = TraceFileSystem.wrap(vm.getVolumeByPath(dest).getFileSystem()); + FileSystem destFs = vm.getVolumeByPath(dest).getFileSystem(); destFs.create(dest).close(); log.warn(" marked " + dest + " failed", ex); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java index 9a1117d..7378348 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java @@ -64,7 +64,6 @@ import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; import org.apache.accumulo.core.util.LocalityGroupUtil.Partitioner; import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.server.trace.TraceFileSystem; import org.apache.commons.lang.mutable.MutableLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -557,7 +556,7 @@ public class InMemoryMap { private synchronized FileSKVIterator getReader() throws IOException { if (reader == null) { Configuration conf = CachedConfiguration.getInstance(); - FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf)); + FileSystem fs = FileSystem.getLocal(conf); reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, SiteConfiguration.getInstance()); if (iflag != null) @@ -712,7 +711,7 @@ public class InMemoryMap { // dump memmap exactly as is to a tmp file on disk, and switch scans to that temp file try { Configuration conf = CachedConfiguration.getInstance(); - FileSystem fs = TraceFileSystem.wrap(FileSystem.getLocal(conf)); + FileSystem fs = FileSystem.getLocal(conf); String tmpFile = memDumpDir + "/memDump" + UUID.randomUUID() + "." + RFile.EXTENSION; http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 54c75f8..9fd255a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -122,6 +122,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor; import org.apache.accumulo.core.tabletserver.thrift.TabletStats; +import org.apache.accumulo.core.trace.DistributedTrace; import org.apache.accumulo.core.util.ByteBufferUtil; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.ColumnFQ; @@ -2876,11 +2877,13 @@ public class TabletServer implements Runnable { Accumulo.init(fs, conf, app); TabletServer server = new TabletServer(conf, fs); server.config(hostname); - Accumulo.enableTracing(hostname, app); + DistributedTrace.enable(hostname, app, conf.getConfiguration()); server.run(); } catch (Exception ex) { log.error("Uncaught exception in TabletServer.main, exiting", ex); System.exit(1); + } finally { + DistributedTrace.disable(); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java index 732907d..4a899ef 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java @@ -228,7 +228,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem { // We made no status, punt on it for now, and let it re-queue itself for work return status; } finally { - Trace.offNoFlush(); + Trace.off(); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index ef3a0c9..9dd4323 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -2078,11 +2078,10 @@ public class Tablet implements TabletCommitter { this.notifyAll(); } - Span curr = Trace.currentTrace(); - curr.data("extent", "" + getExtent()); + span.data("extent", "" + getExtent()); if (majCStats != null) { - curr.data("read", "" + majCStats.getEntriesRead()); - curr.data("written", "" + majCStats.getEntriesWritten()); + span.data("read", "" + majCStats.getEntriesRead()); + span.data("written", "" + majCStats.getEntriesWritten()); } } } finally { http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/shell/src/main/java/org/apache/accumulo/shell/Shell.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/accumulo/shell/Shell.java b/shell/src/main/java/org/apache/accumulo/shell/Shell.java index a0ff17a..996b2af 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/Shell.java +++ b/shell/src/main/java/org/apache/accumulo/shell/Shell.java @@ -78,7 +78,6 @@ import org.apache.accumulo.core.util.format.Formatter; import org.apache.accumulo.core.util.format.FormatterFactory; import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.accumulo.core.zookeeper.ZooUtil; -import org.apache.accumulo.fate.zookeeper.ZooReader; import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; import org.apache.accumulo.start.classloader.vfs.ContextManager; import org.apache.accumulo.shell.commands.AboutCommand; @@ -328,8 +327,7 @@ public class Shell extends ShellOptions { } if (!options.isFake()) { - ZooReader zr = new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); - DistributedTrace.enable(instance, zr, "shell", InetAddress.getLocalHost().getHostName()); + DistributedTrace.enable(InetAddress.getLocalHost().getHostName(), "shell", options.getClientConfiguration()); } this.setTableName(""); @@ -525,6 +523,7 @@ public class Shell extends ShellOptions { System.exit(shell.start()); } finally { shell.shutdown(); + DistributedTrace.disable(); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/shell/src/main/java/org/apache/accumulo/shell/commands/TraceCommand.java ---------------------------------------------------------------------- diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/TraceCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/TraceCommand.java index 7f63570..819f61c 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/TraceCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/TraceCommand.java @@ -40,7 +40,7 @@ public class TraceCommand extends DebugCommand { Trace.on("shell:" + shellState.getPrincipal()); } else if (cl.getArgs()[0].equalsIgnoreCase("off")) { if (Trace.isTracing()) { - final long trace = Trace.currentTrace().traceId(); + final long trace = Trace.currentTraceId(); Trace.off(); StringBuffer sb = new StringBuffer(); int traceCount = 0; http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/test/src/main/java/org/apache/accumulo/test/TestIngest.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/TestIngest.java b/test/src/main/java/org/apache/accumulo/test/TestIngest.java index 7f6c514..47033f3 100644 --- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java +++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java @@ -29,7 +29,6 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; @@ -49,9 +48,7 @@ import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.trace.DistributedTrace; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.FastFormat; -import org.apache.accumulo.fate.zookeeper.ZooReader; import org.apache.accumulo.server.cli.ClientOnDefaultTable; -import org.apache.accumulo.trace.instrument.Trace; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; @@ -181,11 +178,9 @@ public class TestIngest { Opts opts = new Opts(); BatchWriterOpts bwOpts = new BatchWriterOpts(); opts.parseArgs(TestIngest.class.getName(), args, bwOpts); - - Instance instance = opts.getInstance(); - + String name = TestIngest.class.getSimpleName(); - DistributedTrace.enable(instance, new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()), name, null); + DistributedTrace.enable(name); try { opts.startTracing(name); @@ -199,7 +194,8 @@ public class TestIngest { } catch (Exception e) { throw new RuntimeException(e); } finally { - Trace.off(); + opts.stopTracing(); + DistributedTrace.disable(); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java b/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java index 74b03e4..902d49e 100644 --- a/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java +++ b/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java @@ -25,7 +25,6 @@ import org.apache.accumulo.core.cli.ScannerOpts; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.Key; @@ -34,8 +33,8 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.trace.DistributedTrace; -import org.apache.accumulo.fate.zookeeper.ZooReader; import org.apache.accumulo.trace.instrument.Trace; + import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; @@ -62,19 +61,19 @@ public class VerifyIngest { Opts opts = new Opts(); ScannerOpts scanOpts = new ScannerOpts(); opts.parseArgs(VerifyIngest.class.getName(), args, scanOpts); - Instance instance = opts.getInstance(); try { if (opts.trace) { String name = VerifyIngest.class.getSimpleName(); - DistributedTrace.enable(instance, new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()), name, null); + DistributedTrace.enable(); Trace.on(name); - Trace.currentTrace().data("cmdLine", Arrays.asList(args).toString()); + Trace.data("cmdLine", Arrays.asList(args).toString()); } verifyIngest(opts.getConnector(), opts, scanOpts); } finally { Trace.off(); + DistributedTrace.disable(); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java index c85373d..81e519e 100644 --- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java +++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java @@ -74,7 +74,6 @@ import org.apache.accumulo.core.trace.TraceDump.Printer; import org.apache.accumulo.core.util.FastFormat; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.examples.simple.constraints.AlphaNumKeyConstraint; -import org.apache.accumulo.fate.zookeeper.ZooReader; import org.apache.accumulo.test.functional.BadIterator; import org.apache.accumulo.test.functional.SimpleMacIT; import org.apache.accumulo.test.functional.SlowIterator; @@ -1234,7 +1233,7 @@ public class ConditionalWriterIT extends SimpleMacIT { conn.tableOperations().create(tableName); conn.tableOperations().deleteRows("trace", null, null); - DistributedTrace.enable(conn.getInstance(), new ZooReader(conn.getInstance().getZooKeepers(), 30*1000), "testTrace", "localhost"); + DistributedTrace.enable("localhost", "testTrace", getClientConfig()); Span root = Trace.on("traceTest"); ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java b/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java index 80ee990..13bdc7e 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java @@ -34,7 +34,6 @@ import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.volume.VolumeConfiguration; -import org.apache.accumulo.server.trace.TraceFileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -59,7 +58,7 @@ public class BulkFileIT extends SimpleMacIT { c.tableOperations().addSplits(tableName, splits); Configuration conf = new Configuration(); AccumuloConfiguration aconf = DefaultConfiguration.getInstance(); - FileSystem fs = TraceFileSystem.wrap(VolumeConfiguration.getDefaultVolume(conf, aconf).getFileSystem()); + FileSystem fs = VolumeConfiguration.getDefaultVolume(conf, aconf).getFileSystem(); String dir = rootPath() + "/bulk_test_diff_files_89723987592_" + getUniqueNames(1)[0]; http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java index 210e057..5b03b17 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java @@ -146,7 +146,7 @@ public class ExamplesIT extends ConfigurableMacIT { } assertTrue(count > 0); result = FunctionalTestUtils.readAll(cluster, TraceDumpExample.class, p); - assertTrue(result.contains("myHost@myApp")); + assertTrue(result.contains("myApp@myHost")); trace.destroy(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/test/src/test/java/org/apache/accumulo/test/functional/SimpleMacIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/SimpleMacIT.java b/test/src/test/java/org/apache/accumulo/test/functional/SimpleMacIT.java index 03677f4..3f04b94 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/SimpleMacIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/SimpleMacIT.java @@ -17,9 +17,11 @@ package org.apache.accumulo.test.functional; import java.io.File; +import java.io.FileNotFoundException; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.conf.Property; @@ -28,6 +30,8 @@ import org.apache.accumulo.minicluster.MiniAccumuloInstance; import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; import org.apache.accumulo.minicluster.impl.ZooKeeperBindException; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.log4j.Logger; import org.junit.After; import org.junit.AfterClass; @@ -41,6 +45,8 @@ import org.junit.BeforeClass; public class SimpleMacIT extends AbstractMacIT { protected static final Logger log = Logger.getLogger(SimpleMacIT.class); + private static final String INSTANCE_NAME = "instance1"; + private static File folder; private static MiniAccumuloClusterImpl cluster = null; @@ -124,7 +130,7 @@ public class SimpleMacIT extends AbstractMacIT { */ private static Connector getInstanceOneConnector() { try { - return new MiniAccumuloInstance("instance1", getInstanceOnePath()).getConnector("root", new PasswordToken(ROOT_PASSWORD)); + return new MiniAccumuloInstance(INSTANCE_NAME, getInstanceOnePath()).getConnector("root", new PasswordToken(ROOT_PASSWORD)); } catch (Exception e) { return null; } @@ -134,4 +140,14 @@ public class SimpleMacIT extends AbstractMacIT { return new File(System.getProperty("user.dir") + "/accumulo-maven-plugin/instance1"); } + protected static ClientConfiguration getClientConfig() throws FileNotFoundException, ConfigurationException { + if (getInstanceOneConnector() == null) { + return new ClientConfiguration(new PropertiesConfiguration(cluster.getConfig().getClientConfFile())); + } else { + File directory = getInstanceOnePath(); + return new ClientConfiguration(MiniAccumuloInstance.getConfigProperties(directory)).withInstance(INSTANCE_NAME) + .withZkHosts(MiniAccumuloInstance.getZooKeepersFromDir(directory)); + } + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/pom.xml ---------------------------------------------------------------------- diff --git a/trace/pom.xml b/trace/pom.xml index aacfb56..d64e3cc 100644 --- a/trace/pom.xml +++ b/trace/pom.xml @@ -39,6 +39,10 @@ <artifactId>zookeeper</artifactId> </dependency> <dependency> + <groupId>org.htrace</groupId> + <artifactId>htrace-core</artifactId> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/instrument/CloudtraceSpan.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/CloudtraceSpan.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/CloudtraceSpan.java new file mode 100644 index 0000000..fb9744e --- /dev/null +++ b/trace/src/main/java/org/apache/accumulo/trace/instrument/CloudtraceSpan.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.trace.instrument; + +import java.util.Map; + +/** + * Base interface for gathering and reporting statistics about a block of execution. + */ +public interface CloudtraceSpan { + static final long ROOT_SPAN_ID = 0; + + /** Begin gathering timing information */ + void start(); + + /** The block has completed, stop the clock */ + void stop(); + + /** Get the start time, in milliseconds */ + long getStartTimeMillis(); + + /** Get the stop time, in milliseconds */ + long getStopTimeMillis(); + + /** Return the total amount of time elapsed since start was called, if running, or difference between stop and start */ + long accumulatedMillis(); + + /** Has the span been started and not yet stopped? */ + boolean running(); + + /** Return a textual description of this span */ + String description(); + + /** A pseudo-unique (random) number assigned to this span instance */ + long spanId(); + + /** The parent span: returns null if this is the root span */ + Span parent(); + + /** A pseudo-unique (random) number assigned to the trace associated with this span */ + long traceId(); + + /** Create a child span of this span with the given description */ + Span child(String description); + + @Override + String toString(); + + /** Return the pseudo-unique (random) number of the parent span, returns ROOT_SPAN_ID if this is the root span */ + long parentId(); + + /** Add data associated with this span */ + void data(String key, String value); + + /** Get data associated with this span (read only) */ + Map<String,String> getData(); +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/instrument/CountSampler.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/CountSampler.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/CountSampler.java index 9a5bdbb..b291ee9 100644 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/CountSampler.java +++ b/trace/src/main/java/org/apache/accumulo/trace/instrument/CountSampler.java @@ -16,26 +16,16 @@ */ package org.apache.accumulo.trace.instrument; -import java.util.Random; - /** - * Sampler that returns true every N calls. - * + * use org.htrace.impl.CountSampler instead */ -public class CountSampler implements Sampler { - - final static Random random = new Random(); - - final long frequency; - long count = random.nextLong(); - +public class CountSampler extends org.htrace.impl.CountSampler implements Sampler { public CountSampler(long frequency) { - this.frequency = frequency; + super(frequency); } @Override public boolean next() { - return (count++ % frequency) == 0; + return super.next(null); } - } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/instrument/Sampler.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/Sampler.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/Sampler.java index 4abb40a..3813530 100644 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/Sampler.java +++ b/trace/src/main/java/org/apache/accumulo/trace/instrument/Sampler.java @@ -17,11 +17,9 @@ package org.apache.accumulo.trace.instrument; /** - * Extremely simple callback to determine the frequency that an action should be performed. - * - * @see Trace#wrapAll + * use org.htrace.Sampler instead */ -public interface Sampler { +public interface Sampler extends org.htrace.Sampler<Object> { boolean next(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/instrument/Span.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/Span.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/Span.java index 5267174..84275ce 100644 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/Span.java +++ b/trace/src/main/java/org/apache/accumulo/trace/instrument/Span.java @@ -16,56 +16,173 @@ */ package org.apache.accumulo.trace.instrument; +import static java.nio.charset.StandardCharsets.UTF_8; + +import org.htrace.NullScope; +import org.htrace.TimelineAnnotation; +import org.htrace.TraceScope; + +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; /** - * Base interface for gathering and reporting statistics about a block of execution. + * This is a wrapper for a TraceScope object, which is a wrapper for a Span and its parent. */ -public interface Span { - static final long ROOT_SPAN_ID = 0; - - /** Begin gathering timing information */ - void start(); - - /** The block has completed, stop the clock */ - void stop(); - - /** Get the start time, in milliseconds */ - long getStartTimeMillis(); - - /** Get the stop time, in milliseconds */ - long getStopTimeMillis(); - - /** Return the total amount of time elapsed since start was called, if running, or difference between stop and start */ - long accumulatedMillis(); - - /** Has the span been started and not yet stopped? */ - boolean running(); - - /** Return a textual description of this span */ - String description(); - - /** A pseudo-unique (random) number assigned to this span instance */ - long spanId(); - - /** The parent span: returns null if this is the root span */ - Span parent(); - - /** A pseudo-unique (random) number assigned to the trace associated with this span */ - long traceId(); - - /** Create a child span of this span with the given description */ - Span child(String description); - - @Override - String toString(); - - /** Return the pseudo-unique (random) number of the parent span, returns ROOT_SPAN_ID if this is the root span */ - long parentId(); - - /** Add data associated with this span */ - void data(String key, String value); - - /** Get data associated with this span (read only) */ - Map<String,String> getData(); +public class Span implements org.htrace.Span, CloudtraceSpan { + public static final long ROOT_SPAN_ID = org.htrace.Span.ROOT_SPAN_ID; + public static final Span NULL_SPAN = new Span(NullScope.INSTANCE); + private TraceScope scope = null; + protected org.htrace.Span span = null; + + public Span(TraceScope scope) { + this.scope = scope; + this.span = scope.getSpan(); + } + + public Span(org.htrace.Span span) { + this.span = span; + } + + public TraceScope getScope() { + return scope; + } + + public org.htrace.Span getSpan() { + return span; + } + + public long traceId() { + return span.getTraceId(); + } + + public void data(String k, String v) { + if (span != null) + span.addKVAnnotation(k.getBytes(UTF_8), v.getBytes(UTF_8)); + } + + @Override + public void stop() { + if (scope == null) { + if (span != null) { + span.stop(); + } + } else { + scope.close(); + } + } + + @Override + public long getStartTimeMillis() { + return span.getStartTimeMillis(); + } + + @Override + public long getStopTimeMillis() { + return span.getStopTimeMillis(); + } + + @Override + public long getAccumulatedMillis() { + return span.getAccumulatedMillis(); + } + + @Override + public boolean isRunning() { + return span.isRunning(); + } + + @Override + public String getDescription() { + return span.getDescription(); + } + + @Override + public long getSpanId() { + return span.getSpanId(); + } + + @Override + public long getTraceId() { + return span.getTraceId(); + } + + @Override + public Span child(String s) { + return new Span(span.child(s)); + } + + @Override + public long getParentId() { + return span.getParentId(); + } + + @Override + public void addKVAnnotation(byte[] k, byte[] v) { + span.addKVAnnotation(k, v); + } + + @Override + public void addTimelineAnnotation(String s) { + span.addTimelineAnnotation(s); + } + + @Override + public Map<byte[], byte[]> getKVAnnotations() { + return span.getKVAnnotations(); + } + + @Override + public List<TimelineAnnotation> getTimelineAnnotations() { + return span.getTimelineAnnotations(); + } + + @Override + public String getProcessId() { + return span.getProcessId(); + } + + @Override + public String toString() { + return span.toString(); + } + + public void start() { + throw new UnsupportedOperationException("can't start span"); + } + + public long accumulatedMillis() { + return getAccumulatedMillis(); + } + + public boolean running() { + return isRunning(); + } + + public String description() { + return getDescription(); + } + + public long spanId() { + return getSpanId(); + } + + public Span parent() { + throw new UnsupportedOperationException("can't get parent"); + } + + public long parentId() { + return getParentId(); + } + + @Override + public Map<String,String> getData() { + Map<byte[],byte[]> data = span.getKVAnnotations(); + HashMap<String,String> stringData = new HashMap<>(); + for (Entry<byte[],byte[]> d : data.entrySet()) { + stringData.put(new String(d.getKey(), UTF_8), new String(d.getValue(), UTF_8)); + } + return stringData; + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/instrument/Trace.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/Trace.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/Trace.java index 19171c4..5ad52fb 100644 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/Trace.java +++ b/trace/src/main/java/org/apache/accumulo/trace/instrument/Trace.java @@ -17,79 +17,124 @@ package org.apache.accumulo.trace.instrument; import org.apache.accumulo.trace.thrift.TInfo; +import org.htrace.TraceInfo; +import org.htrace.wrappers.TraceProxy; + +import static java.nio.charset.StandardCharsets.UTF_8; /** - * A Trace allows a user to gather global, distributed, detailed performance information while requesting a service. The general usage for a user is to do - * something like this: - * - * Trace.on("doSomething"); try { doSomething(); } finally { Trace.off(); } - * - * This updates the environment for this thread, and data collection will occur whenever the thread encounters any Span notations in the code. The information - * about the trace will also be carried over RPC calls as well. If the thread should hand off work to another thread, the environment can be carried with it, so - * that the trace continues on the new thread. + * Utility class for tracing within Accumulo. Not intended for client use! + * */ public class Trace { - - // Initiate tracing if it isn't already started + /** + * Start a trace span with a given description. + */ public static Span on(String description) { - return Tracer.getInstance().on(description); + return on(description, Sampler.ALWAYS); } - - // Turn tracing off: + + /** + * Start a trace span with a given description with the given sampler. + */ + public static <T> Span on(String description, org.htrace.Sampler<T> sampler) { + return new Span(org.htrace.Trace.startSpan(description, sampler)); + } + + /** + * Finish the current trace. + */ public static void off() { - Tracer.getInstance().stopTracing(); - Tracer.getInstance().flush(); + org.htrace.Span span = org.htrace.Trace.currentSpan(); + if (span != null) { + span.stop(); + org.htrace.Tracer.getInstance().continueSpan(null); + } } - + + /** + * @deprecated since 1.7, use {@link #off()} instead + */ + @Deprecated public static void offNoFlush() { - Tracer.getInstance().stopTracing(); + off(); } - - // Are we presently tracing? + + /** + * Returns whether tracing is currently on. + */ public static boolean isTracing() { - return Tracer.getInstance().isTracing(); + return org.htrace.Trace.isTracing(); } - - // If we are tracing, return the current span, else null + + /** + * Return the current span. + * @deprecated since 1.7 -- it is better to save the span you create in a local variable and call its methods, rather than retrieving the current span + */ + @Deprecated public static Span currentTrace() { - return Tracer.getInstance().currentTrace(); + return new Span(org.htrace.Trace.currentSpan()); } - - // Create a new time span, if tracing is on + + /** + * Get the trace id of the current span. + */ + public static long currentTraceId() { + return org.htrace.Trace.currentSpan().getTraceId(); + } + + /** + * Start a new span with a given description, if already tracing. + */ public static Span start(String description) { - return Tracer.getInstance().start(description); + return new Span(org.htrace.Trace.startSpan(description)); } - - // Start a trace in the current thread from information passed via RPC + + /** + * Continue a trace by starting a new span with a given parent and description. + */ public static Span trace(TInfo info, String description) { if (info.traceId == 0) { - return Tracer.NULL_SPAN; + return Span.NULL_SPAN; } - return Tracer.getInstance().continueTrace(description, info.traceId, info.parentId); + TraceInfo ti = new TraceInfo(info.traceId, info.parentId); + return new Span(org.htrace.Trace.startSpan(description, ti)); } - - // Initiate a trace in this thread, starting now + + /** + * Start a new span with a given description and parent. + * @deprecated since 1.7 -- use htrace API + */ + @Deprecated public static Span startThread(Span parent, String description) { - return Tracer.getInstance().startThread(parent, description); + return new Span(org.htrace.Trace.startSpan(description, parent.getSpan())); } - - // Stop a trace in this thread, starting now - public static void endThread(Span span) { - Tracer.getInstance().endThread(span); + + /** + * Add data to the current span. + */ + public static void data(String k, String v) { + org.htrace.Span span = org.htrace.Trace.currentSpan(); + if (span != null) + span.addKVAnnotation(k.getBytes(UTF_8), v.getBytes(UTF_8)); } - - // Wrap the runnable in a new span, if tracing + + /** + * Wrap a runnable in a TraceRunnable, if tracing. + */ public static Runnable wrap(Runnable runnable) { - if (isTracing()) - return new TraceRunnable(Trace.currentTrace(), runnable); - return runnable; + if (isTracing()) { + return new TraceRunnable(org.htrace.Trace.currentSpan(), runnable); + } else { + return runnable; + } } - + // Wrap all calls to the given object with spans public static <T> T wrapAll(T instance) { return TraceProxy.trace(instance); } - + // Sample trace all calls to the given object public static <T> T wrapAll(T instance, Sampler dist) { return TraceProxy.trace(instance, dist); http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/instrument/TraceCallable.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/TraceCallable.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/TraceCallable.java index c3072b1..f682d61 100644 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/TraceCallable.java +++ b/trace/src/main/java/org/apache/accumulo/trace/instrument/TraceCallable.java @@ -16,6 +16,10 @@ */ package org.apache.accumulo.trace.instrument; +import org.htrace.Span; +import org.htrace.Trace; +import org.htrace.TraceScope; + import java.util.concurrent.Callable; /** @@ -25,27 +29,41 @@ import java.util.concurrent.Callable; public class TraceCallable<V> implements Callable<V> { private final Callable<V> impl; private final Span parent; + private final String description; TraceCallable(Callable<V> impl) { - this(Trace.currentTrace(), impl); + this(Trace.currentSpan(), impl); } TraceCallable(Span parent, Callable<V> impl) { + this(parent, impl, null); + } + + TraceCallable(Span parent, Callable<V> impl, String description) { this.impl = impl; this.parent = parent; + this.description = description; } @Override public V call() throws Exception { if (parent != null) { - Span chunk = Trace.startThread(parent, Thread.currentThread().getName()); + TraceScope chunk = Trace.startSpan(getDescription(), parent); try { return impl.call(); } finally { - Trace.endThread(chunk); + TraceExecutorService.endThread(chunk.getSpan()); } } else { return impl.call(); } } + + public Callable<V> getImpl() { + return impl; + } + + private String getDescription() { + return this.description == null ? Thread.currentThread().getName() : description; + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/instrument/TraceExecutorService.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/TraceExecutorService.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/TraceExecutorService.java index 04dcc39..36563d7 100644 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/TraceExecutorService.java +++ b/trace/src/main/java/org/apache/accumulo/trace/instrument/TraceExecutorService.java @@ -16,6 +16,9 @@ */ package org.apache.accumulo.trace.instrument; +import org.htrace.Span; +import org.htrace.Tracer; + import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -108,4 +111,13 @@ public class TraceExecutorService implements ExecutorService { return impl.invokeAny(wrapCollection(tasks), timeout, unit); } + /** + * Finish a given trace and set the span for the current thread to null. + */ + public static void endThread(Span span) { + if (span != null) { + span.stop(); + Tracer.getInstance().continueSpan(null); + } + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/instrument/TraceProxy.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/TraceProxy.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/TraceProxy.java deleted file mode 100644 index cb93210..0000000 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/TraceProxy.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.trace.instrument; - -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; - -import org.apache.log4j.Logger; - -public class TraceProxy { - private static final Logger log = Logger.getLogger(TraceProxy.class); - - static final Sampler ALWAYS = new Sampler() { - @Override - public boolean next() { - return true; - } - }; - - public static <T> T trace(T instance) { - return trace(instance, ALWAYS); - } - - @SuppressWarnings("unchecked") - public static <T> T trace(final T instance, final Sampler sampler) { - InvocationHandler handler = new InvocationHandler() { - @Override - public Object invoke(Object obj, Method method, Object[] args) throws Throwable { - Span span = null; - if (sampler.next()) { - span = Trace.on(method.getName()); - } - try { - return method.invoke(instance, args); - // Can throw RuntimeException, Error, or any checked exceptions of the method. - } catch (InvocationTargetException ite) { - Throwable cause = ite.getCause(); - if (cause == null) { - // This should never happen, but account for it anyway - log.error("Invocation exception during trace with null cause: ", ite); - throw new RuntimeException(ite); - } - throw cause; - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } finally { - if (span != null) { - span.stop(); - } - } - } - }; - return (T) Proxy.newProxyInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), handler); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/instrument/TraceRunnable.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/TraceRunnable.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/TraceRunnable.java index 41c765d..0ddeb9e 100644 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/TraceRunnable.java +++ b/trace/src/main/java/org/apache/accumulo/trace/instrument/TraceRunnable.java @@ -16,6 +16,10 @@ */ package org.apache.accumulo.trace.instrument; +import org.htrace.Span; +import org.htrace.Trace; +import org.htrace.TraceScope; + /** * Wrap a Runnable with a Span that survives a change in threads. * @@ -24,29 +28,39 @@ public class TraceRunnable implements Runnable, Comparable<TraceRunnable> { private final Span parent; private final Runnable runnable; + private final String description; public TraceRunnable(Runnable runnable) { - this(Trace.currentTrace(), runnable); + this(Trace.currentSpan(), runnable); } public TraceRunnable(Span parent, Runnable runnable) { + this(parent, runnable, null); + } + + public TraceRunnable(Span parent, Runnable runnable, String description) { this.parent = parent; this.runnable = runnable; + this.description = description; } @Override public void run() { if (parent != null) { - Span chunk = Trace.startThread(parent, Thread.currentThread().getName()); + TraceScope chunk = Trace.startSpan(getDescription(), parent); try { runnable.run(); } finally { - Trace.endThread(chunk); + TraceExecutorService.endThread(chunk.getSpan()); } } else { runnable.run(); } } + + private String getDescription() { + return this.description == null ? Thread.currentThread().getName() : description; + } @Override public boolean equals(Object o) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/instrument/Tracer.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/Tracer.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/Tracer.java index d70aeea..246d1eb 100644 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/Tracer.java +++ b/trace/src/main/java/org/apache/accumulo/trace/instrument/Tracer.java @@ -16,129 +16,20 @@ */ package org.apache.accumulo.trace.instrument; -import java.security.SecureRandom; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -import org.apache.accumulo.trace.instrument.impl.NullSpan; -import org.apache.accumulo.trace.instrument.impl.RootMilliSpan; -import org.apache.accumulo.trace.instrument.receivers.SpanReceiver; import org.apache.accumulo.trace.thrift.TInfo; +import org.htrace.Span; - -/** - * A Tracer provides the implementation for collecting and distributing Spans within a process. - */ public class Tracer { - private final static Random random = new SecureRandom(); - private final List<SpanReceiver> receivers = new ArrayList<SpanReceiver>(); - - private static final ThreadLocal<Span> currentTrace = new ThreadLocal<Span>(); - public static final NullSpan NULL_SPAN = new NullSpan(); - private static final TInfo dontTrace = new TInfo(0, 0); - - private static Tracer instance = null; - - synchronized public static void setInstance(Tracer tracer) { - instance = tracer; - } - - synchronized public static Tracer getInstance() { - if (instance == null) { - instance = new Tracer(); - } - return instance; - } - + private static final TInfo DONT_TRACE = new TInfo(0, 0); + + /** + * Obtain {@link org.apache.accumulo.trace.thrift.TInfo} for the current span. + */ public static TInfo traceInfo() { - Span span = currentTrace.get(); + Span span = org.htrace.Trace.currentSpan(); if (span != null) { - return new TInfo(span.traceId(), span.spanId()); - } - return dontTrace; - } - - public Span start(String description) { - Span parent = currentTrace.get(); - if (parent == null) - return NULL_SPAN; - return push(parent.child(description)); - } - - public Span on(String description) { - Span parent = currentTrace.get(); - Span root; - if (parent == null) { - root = new RootMilliSpan(description, random.nextLong(), random.nextLong(), Span.ROOT_SPAN_ID); - } else { - root = parent.child(description); - } - return push(root); - } - - public Span startThread(Span parent, String activity) { - return push(parent.child(activity)); - } - - public void endThread(Span span) { - if (span != null) { - span.stop(); - currentTrace.set(null); - } - } - - public boolean isTracing() { - return currentTrace.get() != null; - } - - public Span currentTrace() { - return currentTrace.get(); - } - - public void stopTracing() { - endThread(currentTrace()); - } - - protected void deliver(Span span) { - for (SpanReceiver receiver : receivers) { - receiver.span(span.traceId(), span.spanId(), span.parentId(), span.getStartTimeMillis(), span.getStopTimeMillis(), span.description(), span.getData()); - - } - } - - public synchronized void addReceiver(SpanReceiver receiver) { - receivers.add(receiver); - } - - public synchronized void removeReceiver(SpanReceiver receiver) { - receivers.remove(receiver); - } - - public Span push(Span span) { - if (span != null) { - currentTrace.set(span); - span.start(); - } - return span; - } - - public void pop(Span span) { - if (span != null) { - deliver(span); - currentTrace.set(span.parent()); - } else - currentTrace.set(null); - } - - public Span continueTrace(String description, long traceId, long parentId) { - return push(new RootMilliSpan(description, traceId, random.nextLong(), parentId)); - } - - public void flush() { - for (SpanReceiver receiver : receivers) { - receiver.flush(); + return new TInfo(span.getTraceId(), span.getSpanId()); } + return DONT_TRACE; } - } http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/instrument/impl/MilliSpan.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/impl/MilliSpan.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/impl/MilliSpan.java deleted file mode 100644 index b641a2c..0000000 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/impl/MilliSpan.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.trace.instrument.impl; - -import java.security.SecureRandom; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; - -import org.apache.accumulo.trace.instrument.Span; -import org.apache.accumulo.trace.instrument.Tracer; - - -/** - * A Span implementation that stores its information in milliseconds since the epoch. - */ -public class MilliSpan implements Span { - - private static final Random next = new SecureRandom(); - private long start; - private long stop; - final private Span parent; - final private String description; - final private long spanId; - final private long traceId; - private Map<String,String> traceInfo = null; - - public Span child(String description) { - return new MilliSpan(description, next.nextLong(), traceId, this); - } - - public MilliSpan(String description, long id, long traceId, Span parent) { - this.description = description; - this.spanId = id; - this.traceId = traceId; - this.parent = parent; - this.start = 0; - this.stop = 0; - } - - public synchronized void start() { - if (start > 0) - throw new IllegalStateException("Span for " + description + " has already been started"); - start = System.currentTimeMillis(); - } - - public synchronized void stop() { - if (start == 0) - throw new IllegalStateException("Span for " + description + " has not been started"); - stop = System.currentTimeMillis(); - Tracer.getInstance().pop(this); - } - - protected long currentTimeMillis() { - return System.currentTimeMillis(); - } - - public synchronized boolean running() { - return start != 0 && stop == 0; - } - - public synchronized long accumulatedMillis() { - if (start == 0) - return 0; - if (stop > 0) - return stop - start; - return currentTimeMillis() - start; - } - - public String toString() { - long parentId = parentId(); - return ("\"" + description() + "\" trace:" + Long.toHexString(traceId()) + " span:" + spanId + (parentId > 0 ? " parent:" + parentId : "") + " start:" - + start + " ms: " + Long.toString(accumulatedMillis()) + (running() ? "..." : "")); - - } - - public String description() { - return description; - } - - @Override - public long spanId() { - return spanId; - } - - @Override - public Span parent() { - return parent; - } - - @Override - public long parentId() { - if (parent == null) - return -1; - return parent.spanId(); - } - - @Override - public long traceId() { - return traceId; - } - - @Override - public long getStartTimeMillis() { - return start; - } - - @Override - public long getStopTimeMillis() { - return stop; - } - - @Override - public void data(String key, String value) { - if (traceInfo == null) - traceInfo = new HashMap<String,String>(); - traceInfo.put(key, value); - } - - @Override - public Map<String,String> getData() { - if (traceInfo == null) - return Collections.emptyMap(); - return Collections.unmodifiableMap(traceInfo); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/instrument/impl/NullSpan.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/impl/NullSpan.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/impl/NullSpan.java deleted file mode 100644 index 916b6cf..0000000 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/impl/NullSpan.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.trace.instrument.impl; - -import java.util.Collections; -import java.util.Map; - -import org.apache.accumulo.trace.instrument.Span; - - -/** - * A Span that does nothing. Used to avoid returning and checking for nulls when we are not tracing. - * - */ -public class NullSpan implements Span { - - public NullSpan() {} - - @Override - public long accumulatedMillis() { - return 0; - } - - @Override - public String description() { - return "NullSpan"; - } - - @Override - public long getStartTimeMillis() { - return 0; - } - - @Override - public long getStopTimeMillis() { - return 0; - } - - @Override - public Span parent() { - return null; - } - - @Override - public long parentId() { - return -1; - } - - @Override - public boolean running() { - return false; - } - - @Override - public long spanId() { - return -1; - } - - @Override - public void start() {} - - @Override - public void stop() {} - - @Override - public long traceId() { - return -1; - } - - @Override - public Span child(String description) { - return this; - } - - @Override - public void data(String key, String value) {} - - @Override - public String toString() { - return "Not Tracing"; - } - - @Override - public Map<String,String> getData() { - return Collections.emptyMap(); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/instrument/impl/RootMilliSpan.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/impl/RootMilliSpan.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/impl/RootMilliSpan.java deleted file mode 100644 index c25e644..0000000 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/impl/RootMilliSpan.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.trace.instrument.impl; - -/** - * Span that roots the span tree in a process, but perhaps not the whole trace. - * - */ -public class RootMilliSpan extends MilliSpan { - - final long traceId; - final long parentId; - - @Override - public long traceId() { - return traceId; - } - - public RootMilliSpan(String description, long traceId, long spanId, long parentId) { - super(description, spanId, traceId, null); - this.traceId = traceId; - this.parentId = parentId; - } - - public long parentId() { - return parentId; - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/AsyncSpanReceiver.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/AsyncSpanReceiver.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/AsyncSpanReceiver.java deleted file mode 100644 index 4eebd69..0000000 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/AsyncSpanReceiver.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.trace.instrument.receivers; - -import java.util.AbstractQueue; -import java.util.HashMap; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ConcurrentLinkedQueue; - -import org.apache.accumulo.trace.thrift.RemoteSpan; -import org.apache.log4j.Logger; - - -/** - * Deliver Span information periodically to a destination. - * <ul> - * <li>Send host and service information with the span. - * <li>Cache Destination objects by some key that can be extracted from the span. - * <li>Can be used to queue spans up for delivery over RPC, or for saving into a file. - * </ul> - */ -public abstract class AsyncSpanReceiver<SpanKey,Destination> implements SpanReceiver { - - private static final Logger log = Logger.getLogger(AsyncSpanReceiver.class); - - private final Map<SpanKey,Destination> clients = new HashMap<SpanKey,Destination>(); - - protected final String host; - protected final String service; - - protected abstract Destination createDestination(SpanKey key) throws Exception; - - protected abstract void send(Destination resource, RemoteSpan span) throws Exception; - - protected abstract SpanKey getSpanKey(Map<String,String> data); - - Timer timer = new Timer("SpanSender", true); - final AbstractQueue<RemoteSpan> sendQueue = new ConcurrentLinkedQueue<RemoteSpan>(); - - public AsyncSpanReceiver(String host, String service, long millis) { - this.host = host; - this.service = service; - timer.schedule(new TimerTask() { - @Override - public void run() { - try { - sendSpans(); - } catch (Exception ex) { - log.warn("Exception sending spans to destination", ex); - } - } - - }, millis, millis); - } - - void sendSpans() { - while (!sendQueue.isEmpty()) { - boolean sent = false; - RemoteSpan s = sendQueue.peek(); - if (s.stop - s.start < 1) { - synchronized (sendQueue) { - sendQueue.remove(); - sendQueue.notifyAll(); - } - continue; - } - SpanKey dest = getSpanKey(s.data); - Destination client = clients.get(dest); - if (client == null) { - try { - clients.put(dest, createDestination(dest)); - } catch (Exception ex) { - log.warn("Exception creating connection to span receiver", ex); - } - } - if (client != null) { - try { - send(client, s); - synchronized (sendQueue) { - sendQueue.remove(); - sendQueue.notifyAll(); - } - sent = true; - } catch (Exception ex) { - log.error(ex, ex); - } - } - if (!sent) - break; - } - } - - @Override - public void span(long traceId, long spanId, long parentId, long start, long stop, String description, Map<String,String> data) { - - SpanKey dest = getSpanKey(data); - if (dest != null) { - sendQueue.add(new RemoteSpan(host, service, traceId, spanId, parentId, start, stop, description, data)); - } - } - - @Override - public void flush() { - synchronized (sendQueue) { - while (!sendQueue.isEmpty()) { - try { - sendQueue.wait(); - } catch (InterruptedException e) { - log.warn("flush interrupted"); - break; - } - } - } - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/LogSpans.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/LogSpans.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/LogSpans.java deleted file mode 100644 index dfed660..0000000 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/LogSpans.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.trace.instrument.receivers; - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Map; - -import org.apache.log4j.Level; - -/** - * A SpanReceiver that just logs the data using log4j. - */ -public class LogSpans implements SpanReceiver { - private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(LogSpans.class); - - static public class SpanLevel extends Level { - - private static final long serialVersionUID = 1L; - - protected SpanLevel() { - super(Level.DEBUG_INT + 150, "SPAN", Level.DEBUG_INT + 150); - } - - static public Level toLevel(int val) { - if (val == Level.DEBUG_INT + 150) - return Level.DEBUG; - return Level.toLevel(val); - } - } - - public final static Level SPAN = new SpanLevel(); - - public static String format(long traceId, long spanId, long parentId, long start, long stop, String description, Map<String,String> data) { - String parentStr = ""; - if (parentId > 0) - parentStr = " parent:" + parentId; - String startStr = new SimpleDateFormat("HH:mm:ss.SSS").format(new Date(start)); - return String.format("%20s:%x id:%d%s start:%s ms:%d", description, traceId, spanId, parentStr, startStr, stop - start); - } - - @Override - public void span(long traceId, long spanId, long parentId, long start, long stop, String description, Map<String,String> data) { - log.log(SPAN, format(traceId, spanId, parentId, start, stop, description, data)); - } - - @Override - public void flush() {} -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/db2dda1b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/SendSpansViaThrift.java ---------------------------------------------------------------------- diff --git a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/SendSpansViaThrift.java b/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/SendSpansViaThrift.java deleted file mode 100644 index 4967d97..0000000 --- a/trace/src/main/java/org/apache/accumulo/trace/instrument/receivers/SendSpansViaThrift.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.trace.instrument.receivers; - -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.Map; - -import org.apache.accumulo.trace.thrift.RemoteSpan; -import org.apache.accumulo.trace.thrift.SpanReceiver.Client; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; - - -/** - * Send Span data to a destination using thrift. - */ -public class SendSpansViaThrift extends AsyncSpanReceiver<String,Client> { - - private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(SendSpansViaThrift.class); - - private static final String THRIFT = "thrift://"; - - public SendSpansViaThrift(String host, String service, long millis) { - super(host, service, millis); - } - - @Override - protected Client createDestination(String destination) throws Exception { - if (destination == null) - return null; - try { - int portSeparatorIndex = destination.lastIndexOf(':'); - String host = destination.substring(0, portSeparatorIndex); - int port = Integer.parseInt(destination.substring(portSeparatorIndex+1)); - log.debug("Connecting to " + host + ":" + port); - InetSocketAddress addr = new InetSocketAddress(host, port); - Socket sock = new Socket(); - sock.connect(addr); - TTransport transport = new TSocket(sock); - TProtocol prot = new TBinaryProtocol(transport); - return new Client(prot); - } catch (Exception ex) { - log.error(ex, ex); - return null; - } - } - - @Override - protected void send(Client client, RemoteSpan s) throws Exception { - if (client != null) { - try { - client.span(s); - } catch (Exception ex) { - client.getInputProtocol().getTransport().close(); - client = null; - } - } - } - - protected String getSpanKey(Map<String,String> data) { - String dest = data.get("dest"); - if (dest != null && dest.startsWith(THRIFT)) { - String hostAddress = dest.substring(THRIFT.length()); - String[] hostAddr = hostAddress.split(":", 2); - if (hostAddr.length == 2) { - return hostAddress; - } - } - return null; - } - -}