Repository: incubator-htrace Updated Branches: refs/heads/master d84df8f33 -> fa7a97fde
HTRACE-18. Add htrace-flume, which implements a SpanReceiver that sends spans to Flume (Long Zhou via Colin P. McCabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/fa7a97fd Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/fa7a97fd Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/fa7a97fd Branch: refs/heads/master Commit: fa7a97fde8d339a114be1227eb877905d735e718 Parents: d84df8f Author: Colin P. Mccabe <[email protected]> Authored: Sun Dec 21 16:18:58 2014 -0800 Committer: Colin P. Mccabe <[email protected]> Committed: Sun Dec 21 16:18:58 2014 -0800 ---------------------------------------------------------------------- .../src/main/java/org/apache/htrace/Span.java | 5 + .../htrace/impl/LocalFileSpanReceiver.java | 17 +- .../java/org/apache/htrace/impl/MilliSpan.java | 25 ++ htrace-flume/README.md | 59 ++++ htrace-flume/pom.xml | 108 +++++++ .../apache/htrace/impl/FlumeSpanReceiver.java | 283 +++++++++++++++++++ .../htrace/impl/TestFlumeSpanReceiver.java | 176 ++++++++++++ .../htrace/impl/TestHBaseSpanReceiver.java | 3 + pom.xml | 1 + 9 files changed, 661 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fa7a97fd/htrace-core/src/main/java/org/apache/htrace/Span.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/Span.java b/htrace-core/src/main/java/org/apache/htrace/Span.java index 633bcba..b08cfc8 100644 --- a/htrace-core/src/main/java/org/apache/htrace/Span.java +++ b/htrace-core/src/main/java/org/apache/htrace/Span.java @@ -113,4 +113,9 @@ public interface Span { * @return */ String getProcessId(); + + /** + * Serialize to Json + */ + String toJson(); } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fa7a97fd/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java b/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java index 627b758..7095008 100644 --- a/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java +++ b/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java @@ -21,13 +21,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.htrace.HTraceConfiguration; import org.apache.htrace.Span; import org.apache.htrace.SpanReceiver; -import org.mortbay.util.ajax.JSON; import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; -import java.util.LinkedHashMap; -import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -48,7 +45,6 @@ public class LocalFileSpanReceiver implements SpanReceiver { private String file; private FileWriter fwriter; private BufferedWriter bwriter; - private Map<String, Object> values; private ExecutorService executor; private long executorTerminationTimeoutDuration; @@ -67,7 +63,6 @@ public class LocalFileSpanReceiver implements SpanReceiver { throw new RuntimeException(ioe); } this.bwriter = new BufferedWriter(fwriter); - this.values = new LinkedHashMap<String, Object>(); } @@ -81,19 +76,9 @@ public class LocalFileSpanReceiver implements SpanReceiver { @Override public void run() { try { - values.put("TraceID", span.getTraceId()); - values.put("SpanID", span.getSpanId()); - values.put("ParentID", span.getParentId()); - values.put("ProcessID", span.getProcessId()); - values.put("Start", span.getStartTimeMillis()); - values.put("Stop", span.getStopTimeMillis()); - values.put("Description", span.getDescription()); - values.put("KVAnnotations", span.getKVAnnotations()); - values.put("TLAnnotations", span.getTimelineAnnotations()); - bwriter.write(JSON.toString(values)); + bwriter.write(span.toJson()); bwriter.newLine(); bwriter.flush(); - values.clear(); } catch (IOException e) { LOG.error("Error when writing to file: " + file, e); } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fa7a97fd/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java index 9d24f68..f313e61 100644 --- a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java +++ b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java @@ -19,10 +19,12 @@ package org.apache.htrace.impl; import org.apache.htrace.Span; import org.apache.htrace.TimelineAnnotation; import org.apache.htrace.Tracer; +import org.mortbay.util.ajax.JSON; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Random; @@ -158,4 +160,27 @@ public class MilliSpan implements Span { public String getProcessId() { return processId; } + + @Override + public String toJson() { + Map<String, Object> values = new LinkedHashMap<String, Object>(); + values.put("TraceID", traceId); + values.put("SpanID", spanId); + values.put("ParentID", parentSpanId); + if (processId != null) { + values.put("ProcessID", processId); + } + values.put("Start", start); + values.put("Stop", stop); + if (description != null) { + values.put("Description", description); + } + if (timeline != null) { + values.put("TLAnnotations", timeline); + } + if (traceInfo != null){ + values.put("KVAnnotations", traceInfo); + } + return JSON.toString(values); + } } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fa7a97fd/htrace-flume/README.md ---------------------------------------------------------------------- diff --git a/htrace-flume/README.md b/htrace-flume/README.md new file mode 100644 index 0000000..32bbb88 --- /dev/null +++ b/htrace-flume/README.md @@ -0,0 +1,59 @@ +htrace-flume +============ + +htrace-flume provides the span receiver which sends tracing spans to Flume collector. + +Tutorial +-------- + +1) build and deploy + + $ cd htrace/htrace-flume + $ mvn compile assembly:single + $ cp target/htrace-flume-*-jar-with-dependencies.jar $HADOOP_HOME/share/hadoop/hdfs/lib/ + +2) Edit hdfs-site.xml to include the following: + + <property> + <name>hadoop.trace.spanreceiver.classes</name> + <value>org.htrace.impl.FlumeSpanReceiver</value> + </property> + <property> + <name>hadoop.htrace.flume.hostname</name> + <value>127.0.0.1</value> + </property> + <property> + <name>hadoop.htrace.flume.port</name> + <value>60000</value> + </property> + +3) Setup flume collector + +Create flume-conf.properties file. Below is a sample that sets up an hdfs sink. + + agent.sources = avro-collection-source + agent.channels = memoryChannel + agent.sinks = loggerSink hdfs-sink + + # avro source - should match the configurations in hdfs-site.xml + agent.sources.avro-collection-source.type = avro + agent.sources.avro-collection-source.bind = 127.0.0.1 + agent.sources.avro-collection-source.port = 60000 + agent.sources.avro-collection-source.channels = memoryChannel + + #sample hdfs-sink, change to any sink that flume supports + agent.sinks.hdfs-sink.type = hdfs + agent.sinks.hdfs-sink.hdfs.path = hdfs://127.0.0.1:9000/flume + agent.sinks.hdfs-sink.channel = memoryChannel + agent.sinks.hdfs-sink.hdfs.fileType = DataStream + agent.sinks.hdfs-sink.hdfs.writeFormat = Text + agent.sinks.hdfs-sink.hdfs.rollSize = 0 + agent.sinks.hdfs-sink.hdfs.rollCount = 10000 + agent.sinks.hdfs-sink.hdfs.batchSize = 100 + + # memory channel + agent.channels.memoryChannel.capacity = 10000 + agent.channels.memoryChannel.transactionCapacity = 1000 + +Run flume agent using command "flume-ng agent -c ./conf/ -f ./conf/flume-conf.properties -n agent" + http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fa7a97fd/htrace-flume/pom.xml ---------------------------------------------------------------------- diff --git a/htrace-flume/pom.xml b/htrace-flume/pom.xml new file mode 100644 index 0000000..1bf258b --- /dev/null +++ b/htrace-flume/pom.xml @@ -0,0 +1,108 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor +license agreements. See the NOTICE file distributed with this work for additional +information regarding copyright ownership. The ASF licenses this file to +You under the Apache License, Version 2.0 (the "License"); you may not use +this file except in compliance with the License. You may obtain a copy of +the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required +by applicable law or agreed to in writing, software distributed under the +License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS +OF ANY KIND, either express or implied. See the License for the specific +language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>htrace-flume</artifactId> + <packaging>jar</packaging> + + <parent> + <artifactId>htrace</artifactId> + <groupId>org.apache.htrace</groupId> + <version>3.1.0-SNAPSHOT</version> + </parent> + + <name>htrace-flume</name> + <description>A 'SpanReceiver' implementation that sends spans to Flume collector.</description> + <url>http://incubator.apache.org/projects/htrace.html</url> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <flume.version>1.5.2</flume.version> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-javadoc-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-gpg-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + </plugin> + <plugin> + <!-- explicitly define maven-deploy-plugin after other to force exec order --> + <artifactId>maven-deploy-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + </plugin> + </plugins> + </build> + + <dependencies> + <!-- Module deps. --> + <dependency> + <groupId>org.apache.htrace</groupId> + <artifactId>htrace-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.htrace</groupId> + <artifactId>htrace-core</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + <!-- Global deps. --> + <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <!-- Flume specific deps. --> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-sdk</artifactId> + <version>${flume.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-sdk</artifactId> + <version>${flume.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fa7a97fd/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java b/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java new file mode 100644 index 0000000..54b8a14 --- /dev/null +++ b/htrace-flume/src/main/java/org/apache/htrace/impl/FlumeSpanReceiver.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.htrace.impl; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.flume.Event; +import org.apache.flume.FlumeException; +import org.apache.flume.api.RpcClient; +import org.apache.flume.api.RpcClientFactory; +import org.apache.flume.event.EventBuilder; +import org.apache.htrace.HTraceConfiguration; +import org.apache.htrace.Span; +import org.apache.htrace.SpanReceiver; + +public class FlumeSpanReceiver implements SpanReceiver { + private static final Log LOG = LogFactory.getLog(FlumeSpanReceiver.class); + + public static final String NUM_THREADS_KEY = "htrace.flume.num-threads"; + public static final int DEFAULT_NUM_THREADS = 1; + public static final String FLUME_HOSTNAME_KEY = "htrace.flume.hostname"; + public static final String DEFAULT_FLUME_HOSTNAME = "localhost"; + public static final String FLUME_PORT_KEY = "htrace.flume.port"; + public static final String FLUME_BATCHSIZE_KEY = "htrace.flume.batchsize"; + public static final int DEFAULT_FLUME_BATCHSIZE = 100; + + /** + * How long this receiver will try and wait for all threads to shutdown. + */ + private static final int SHUTDOWN_TIMEOUT = 30; + + /** + * How many errors in a row before we start dropping traces on the floor. + */ + private static final int MAX_ERRORS = 10; + + /** + * The queue that will get all HTrace spans that are to be sent. + */ + private final BlockingQueue<Span> queue; + + /** + * Boolean used to signal that the threads should end. + */ + private final AtomicBoolean running = new AtomicBoolean(true); + + /** + * The thread factory used to create new ExecutorService. + * <p/> + * This will be the same factory for the lifetime of this object so that + * no thread names will ever be duplicated. + */ + private final ThreadFactory tf; + + //////////////////// + /// Variables that will change on each call to configure() + /////////////////// + private ExecutorService service; + private int maxSpanBatchSize; + private String flumeHostName; + private int flumePort; + + public FlumeSpanReceiver(HTraceConfiguration conf) { + this.queue = new ArrayBlockingQueue<Span>(1000); + this.tf = new SimpleThreadFactory(); + configure(conf); + } + + private class SimpleThreadFactory implements ThreadFactory { + final AtomicLong count = new AtomicLong(0); + @Override + public Thread newThread(Runnable arg0) { + String name = String.format("flumeSpanReceiver-%d", count.getAndIncrement()); + Thread t = new Thread(arg0, name); + t.setDaemon(true); + return t; + } + } + + private void configure (HTraceConfiguration conf) { + + // Read configuration + int numThreads = conf.getInt(NUM_THREADS_KEY, DEFAULT_NUM_THREADS); + this.flumeHostName = conf.get(FLUME_HOSTNAME_KEY, DEFAULT_FLUME_HOSTNAME); + this.flumePort = conf.getInt(FLUME_PORT_KEY, 0); + if (this.flumePort == 0) { + throw new IllegalArgumentException(FLUME_PORT_KEY + " is required in configuration."); + } + this.maxSpanBatchSize = conf.getInt(FLUME_BATCHSIZE_KEY, DEFAULT_FLUME_BATCHSIZE); + + // Initialize executors + // If there are already threads running tear them down. + if (this.service != null) { + this.service.shutdownNow(); + this.service = null; + } + this.service = Executors.newFixedThreadPool(numThreads, tf); + for (int i = 0; i < numThreads; i++) { + this.service.submit(new WriteSpanRunnable()); + } + } + + private class WriteSpanRunnable implements Runnable { + private RpcClient flumeClient = null; + + /** + * This runnable sends a HTrace span to the Flume. + */ + @Override + public void run() { + List<Span> dequeuedSpans = new ArrayList<Span>(maxSpanBatchSize); + long errorCount = 0; + + while (running.get() || queue.size() > 0) { + Span firstSpan = null; + try { + // Block for up to a second. to try and get a span. + // We only block for a little bit in order to notice + // if the running value has changed + firstSpan = queue.poll(1, TimeUnit.SECONDS); + + // If the poll was successful then it's possible that there + // will be other spans to get. Try and get them. + if (firstSpan != null) { + // Add the first one that we got + dequeuedSpans.add(firstSpan); + // Try and get up to 100 queues + queue.drainTo(dequeuedSpans, maxSpanBatchSize - 1); + } + } catch (InterruptedException ie) { + // Ignored. + } + + startClient(); + if (dequeuedSpans.isEmpty()) { + continue; + } + + try { + List<Event> events = new ArrayList<Event>(dequeuedSpans.size()); + for (Span span : dequeuedSpans) { + // Headers allow Flume to filter + Map<String, String> headers = new HashMap<String, String>(); + headers.put("TraceId", Long.toString(span.getTraceId())); + headers.put("SpanId", Long.toString(span.getSpanId())); + headers.put("ProcessId", span.getProcessId()); + headers.put("Description", span.getDescription()); + + String body = span.toJson(); + + Event evt = EventBuilder.withBody(body, Charset.forName("UTF-8"), headers); + events.add(evt); + } + flumeClient.appendBatch(events); + + // clear the list for the next time through. + dequeuedSpans.clear(); + // reset the error counter. + errorCount = 0; + } catch (Exception e) { + errorCount += 1; + // If there have been ten errors in a row start dropping things. + if (errorCount < MAX_ERRORS) { + try { + queue.addAll(dequeuedSpans); + } catch (IllegalStateException ex) { + LOG.error("Drop " + dequeuedSpans.size() + + " span(s) because writing to HBase failed."); + } + } + closeClient(); + try { + // Since there was an error sleep just a little bit to try and allow the + // HBase some time to recover. + Thread.sleep(500); + } catch (InterruptedException e1) { + // Ignored + } + } + } + closeClient(); + } + + /** + * Close Flume RPC client + */ + private void closeClient() { + if (flumeClient != null) { + try { + flumeClient.close(); + } catch (FlumeException ex) { + LOG.warn("Error while trying to close Flume Rpc Client.", ex); + } finally { + flumeClient = null; + } + } + } + + /** + * Create / reconnect Flume RPC client + */ + private void startClient() { + // If current client is inactive, close it + if (flumeClient != null && !flumeClient.isActive()) { + flumeClient.close(); + flumeClient = null; + } + // Create client if needed + if (flumeClient == null) { + try { + flumeClient = RpcClientFactory.getDefaultInstance(flumeHostName, flumePort, maxSpanBatchSize); + } catch (FlumeException e) { + LOG.warn("Failed to create Flume RPC Client. " + e.getMessage()); + } + } + } + } + + /** + * Close the receiver. + * <p/> + * This tries to shutdown thread pool. + * + * @throws IOException + */ + @Override + public void close() throws IOException { + running.set(false); + service.shutdown(); + try { + if (!service.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) { + LOG.error("Was not able to process all remaining spans upon closing in: " + + SHUTDOWN_TIMEOUT + " " + TimeUnit.SECONDS + + ". Left Spans could be dropped."); + } + } catch (InterruptedException e1) { + LOG.warn("Thread interrupted when terminating executor.", e1); + } + } + + @Override + public void receiveSpan(Span span) { + if (running.get()) { + try { + this.queue.add(span); + } catch (IllegalStateException e) { + LOG.error("Error trying to append span (" + + span.getDescription() + + ") to the queue. Blocking Queue was full."); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fa7a97fd/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java b/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java new file mode 100644 index 0000000..a825690 --- /dev/null +++ b/htrace-flume/src/test/java/org/apache/htrace/impl/TestFlumeSpanReceiver.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.htrace.impl; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import junit.framework.Assert; + +import org.apache.avro.AvroRemoteException; +import org.apache.avro.ipc.Server; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.FlumeException; +import org.apache.flume.api.RpcTestUtils; +import org.apache.flume.source.avro.AvroFlumeEvent; +import org.apache.flume.source.avro.AvroSourceProtocol; +import org.apache.flume.source.avro.Status; +import org.apache.htrace.HTraceConfiguration; +import org.apache.htrace.Span; +import org.apache.htrace.SpanReceiver; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceCreator; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestFlumeSpanReceiver { + private static final Log LOG = LogFactory.getLog(TestFlumeSpanReceiver.class); + + private static final String ROOT_SPAN_DESC = "ROOT"; + + private SpanReceiver spanReceiver; + private Server flumeServer; + private TraceCreator traceCreator; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + @Test + public void testSimpleTraces() throws FlumeException, + EventDeliveryException, IOException { + AvroHandler avroHandler = null; + List<Span> spans = null; + try { + avroHandler = new AvroHandler(); + startReceiver(null, avroHandler); + + spans = new ArrayList<Span>(); + Span rootSpan = new MilliSpan(ROOT_SPAN_DESC, 1, Span.ROOT_SPAN_ID, 100, "test"); + Span innerOne = rootSpan.child("Some good work"); + Span innerTwo = innerOne.child("Some more good work"); + innerTwo.stop(); + spans.add(innerTwo); + innerOne.stop(); + spans.add(innerOne); + rootSpan.addKVAnnotation("foo".getBytes(), "bar".getBytes()); + rootSpan.addTimelineAnnotation("timeline"); + rootSpan.stop(); + spans.add(rootSpan); + + } finally { + stopReceiver(); + } + List<AvroFlumeEvent> events = avroHandler.getAllEvents(); + Assert.assertEquals(spans.size(), events.size()); + for (int i = 0; i < spans.size(); i ++) { + String json = new String(events.get(i).getBody().array(), Charset.forName("UTF-8")); + Assert.assertTrue(json.contains(spans.get(i).getDescription())); + } + } + + @Test + public void testConcurrency() throws FlumeException, + EventDeliveryException, IOException { + try { + Map<String, String> extraConf = new HashMap<String, String>(); + extraConf.put(FlumeSpanReceiver.NUM_THREADS_KEY, "5"); + startReceiver(extraConf, new RpcTestUtils.OKAvroHandler()); + traceCreator.createThreadedTrace(); + } finally { + stopReceiver(); + } + } + + @Test + public void testResilience() throws FlumeException, + EventDeliveryException, IOException { + try { + startReceiver(null, new RpcTestUtils.FailedAvroHandler()); + traceCreator.createThreadedTrace(); + } finally { + stopReceiver(); + } + } + + private void startReceiver(Map<String, String> extraConf, AvroSourceProtocol avroHandler) { + // Start Flume server + Assert.assertNull(flumeServer); + flumeServer = RpcTestUtils.startServer(avroHandler); + + // Create and configure span receiver + Map<String, String> conf = new HashMap<String, String>(); + conf.put(FlumeSpanReceiver.FLUME_HOSTNAME_KEY, "127.0.0.1"); + conf.put(FlumeSpanReceiver.FLUME_PORT_KEY, Integer.toString(flumeServer.getPort())); + if (extraConf != null) { + conf.putAll(extraConf); + } + + spanReceiver = new FlumeSpanReceiver(HTraceConfiguration.fromMap(conf)); + + // Create trace creator, it will register our receiver + traceCreator = new TraceCreator(spanReceiver); + } + + private void stopReceiver() throws IOException { + // Close span receiver + if (spanReceiver != null) { + Trace.removeReceiver(spanReceiver); + spanReceiver.close(); + spanReceiver = null; + } + + // Close Flume server + if (flumeServer != null) { + RpcTestUtils.stopServer(flumeServer); + flumeServer = null; + } + } + + private static class AvroHandler implements AvroSourceProtocol { + private ArrayList<AvroFlumeEvent> all_events = new ArrayList<AvroFlumeEvent>(); + + public List<AvroFlumeEvent> getAllEvents() { + return new ArrayList<AvroFlumeEvent>(all_events); + } + + @Override + public Status append(AvroFlumeEvent event) throws AvroRemoteException { + all_events.add(event); + return Status.OK; + } + + @Override + public Status appendBatch(List<AvroFlumeEvent> events) throws + AvroRemoteException { + all_events.addAll(events); + return Status.OK; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fa7a97fd/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java b/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java index e6a6491..d3cffe2 100644 --- a/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java +++ b/htrace-hbase/src/test/java/org/apache/htrace/impl/TestHBaseSpanReceiver.java @@ -221,5 +221,8 @@ public class TestHBaseSpanReceiver { public Span child(String description) { return null; } + + @Override + public String toJson() { return null; } } } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/fa7a97fd/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 10bf552..22853f3 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,7 @@ language governing permissions and limitations under the License. --> <module>htrace-core</module> <module>htrace-zipkin</module> <module>htrace-hbase</module> + <module>htrace-flume</module> </modules> <parent>
