Repository: incubator-htrace Updated Branches: refs/heads/master 195ecee7d -> 9ebdafb45
HTRACE-235. htrace-zipkin - add Kafka transport support (Cosmin Lehene via Colin P. McCabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/9ebdafb4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/9ebdafb4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/9ebdafb4 Branch: refs/heads/master Commit: 9ebdafb45df03a4b0d90c77af65e0bd21c09ec1a Parents: 195ecee Author: Colin P. Mccabe <[email protected]> Authored: Sat Oct 3 23:52:32 2015 -0700 Committer: Colin P. Mccabe <[email protected]> Committed: Sat Oct 3 23:52:32 2015 -0700 ---------------------------------------------------------------------- htrace-zipkin/README.md | 50 +++++ htrace-zipkin/pom.xml | 45 +++++ .../main/java/org/apache/htrace/Transport.java | 50 +++++ .../org/apache/htrace/impl/KafkaTransport.java | 113 ++++++++++++ .../org/apache/htrace/impl/ScribeTransport.java | 158 ++++++++++++++++ .../apache/htrace/impl/ZipkinSpanReceiver.java | 183 +++++++++---------- .../htrace/TestHTraceSpanToZipkinSpan.java | 154 ---------------- .../htrace/impl/TestZipkinSpanReceiver.java | 97 +++++----- .../apache/htrace/zipkin/ITZipkinReceiver.java | 114 ++++++++++++ .../zipkin/TestHTraceSpanToZipkinSpan.java | 154 ++++++++++++++++ .../src/test/resources/log4j.properties | 30 +++ 11 files changed, 848 insertions(+), 300 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/9ebdafb4/htrace-zipkin/README.md ---------------------------------------------------------------------- diff --git a/htrace-zipkin/README.md b/htrace-zipkin/README.md new file mode 100644 index 0000000..6fbee95 --- /dev/null +++ b/htrace-zipkin/README.md @@ -0,0 +1,50 @@ +HTrace Zipkin Receiver +====================== + +Use the HTrace Java library with [Zipkin](https://github.com/openzipkin/zipkin). + +To use, set `"span.receiver.classes", "org.apache.htrace.impl.ZipkinSpanReceiver"` +in the HTraceConfiguration. + + +Transports +---------- + +The Zipkin receiver supports both the Zipkin Scribe (default) and Kafka transports, +controlled through the `zipkin.transport.class` configuration. + +Scribe (Thrift) Transport +------------------------- + +### Configuration + +Configurations are prefixed with `zipkin.scribe`. + +* `zipkin.scribe.hostname`, `localhost` +* `zipkin.scribe.port`, `9410` + +Deprecated (backwards compatibility): + +* `zipkin.collector-hostname`, `localhost` +* `zipkin.collector-port`, `9410` + +Kafka Transport +--------------- + +To use the Kafka transport, add +`"zipkin.transport.class", "org.apache.htrace.impl.KafkaTransport"` +to the configuration. + +### Configuration + +Configurations are prefixed with `zipkin.kafka`. + +* `zipkin.kafka.topic`, `zipkin` + +Producer specific configurations + +* `zipkin.kafka.metadata.broker.list`, `localhost:9092` +* `zipkin.kafka.request.required.acks`, `0` +* `zipkin.kafka.producer.type`, `async` +* `zipkin.kafka.serializer.class`, `kafka.serializer.DefaultEncoder` +* `zipkin.kafka.compression.codec`, `1` http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/9ebdafb4/htrace-zipkin/pom.xml ---------------------------------------------------------------------- diff --git a/htrace-zipkin/pom.xml b/htrace-zipkin/pom.xml index 8e2c8a1..77bba02 100644 --- a/htrace-zipkin/pom.xml +++ b/htrace-zipkin/pom.xml @@ -28,6 +28,8 @@ language governing permissions and limitations under the License. --> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <slf4j.version>1.5.8</slf4j.version> + <kafka.version>0.8.2.1</kafka.version> + <scala.version>2.11</scala.version> </properties> <build> @@ -58,30 +60,52 @@ language governing permissions and limitations under the License. --> <phase>package</phase> <configuration> <relocations> + <relocation> <pattern>org.apache.commons.logging</pattern> <shadedPattern>org.apache.htrace.commons.logging</shadedPattern> </relocation> + <relocation> <pattern>org.apache.thrift</pattern> <shadedPattern>org.apache.htrace.thrift</shadedPattern> </relocation> + <relocation> <pattern>org.slf4j</pattern> <shadedPattern>org.apache.htrace.slf4j</shadedPattern> </relocation> + <relocation> <pattern>org.apache.commons.codec</pattern> <shadedPattern>org.apache.htrace.commons.codec</shadedPattern> </relocation> + <relocation> <pattern>org.apache.commons.lang</pattern> <shadedPattern>org.apache.htrace.commons.lang</shadedPattern> </relocation> + <relocation> <pattern>org.apache.http</pattern> <shadedPattern>org.apache.htrace.http</shadedPattern> </relocation> + + <relocation> + <pattern>org.apache.kafka</pattern> + <shadedPattern>org.apache.htrace.kafka</shadedPattern> + </relocation> + + <relocation> + <pattern>kafka</pattern> + <shadedPattern>org.apache.htrace.kafka</shadedPattern> + </relocation> + + <relocation> + <pattern>org.I0Itec</pattern> + <shadedPattern>org.apache.htrace.I0Itec</shadedPattern> + </relocation> + </relocations> </configuration> <goals> @@ -153,6 +177,27 @@ language governing permissions and limitations under the License. --> <artifactId>commons-codec</artifactId> <version>1.7</version> </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.version}</artifactId> + <version>${kafka.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_${scala.version}</artifactId> + <version>${kafka.version}</version> + <classifier>test</classifier> + <scope>test</scope> + </dependency> + </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/9ebdafb4/htrace-zipkin/src/main/java/org/apache/htrace/Transport.java ---------------------------------------------------------------------- diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/Transport.java b/htrace-zipkin/src/main/java/org/apache/htrace/Transport.java new file mode 100644 index 0000000..dd55bea --- /dev/null +++ b/htrace-zipkin/src/main/java/org/apache/htrace/Transport.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.htrace; + +import org.apache.htrace.core.HTraceConfiguration; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * Span data transport interface. + */ +public interface Transport extends Closeable { + + /** + * Open connection to Transport endpoint + * @param conf Transport configuration + * @throws IOException if an I/O error occurs + */ + void open(HTraceConfiguration conf) throws IOException; + + /** + * Checks if the transport in use is open + * @return whether the transport is open + */ + boolean isOpen(); + + /** + * Sends the list of objects to the transport endpoint + * @param spans to be sent + * @throws IOException if an I/O error occurs + */ + void send(List<byte[]> spans) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/9ebdafb4/htrace-zipkin/src/main/java/org/apache/htrace/impl/KafkaTransport.java ---------------------------------------------------------------------- diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/impl/KafkaTransport.java b/htrace-zipkin/src/main/java/org/apache/htrace/impl/KafkaTransport.java new file mode 100644 index 0000000..b352f0c --- /dev/null +++ b/htrace-zipkin/src/main/java/org/apache/htrace/impl/KafkaTransport.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.htrace.impl; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.htrace.Transport; +import org.apache.htrace.core.HTraceConfiguration; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; + +public class KafkaTransport implements Transport { + + private static final Log LOG = LogFactory.getLog(KafkaTransport.class); + private static final String DEFAULT_TOPIC = "zipkin"; + public static final String TOPIC_KEY = "zipkin.kafka.topic"; + + Producer<byte[], byte[]> producer; + private boolean isOpen = false; + private String topic; + + /** + * Opens a new Kafka transport + * @param conf Transport configuration. Some Kafka producer configurations + * can be passed by prefixing the config key with zipkin.kafka + * (e.g. zipkin.kafka.producer.type for producer.type) + * @throws IOException if an I/O error occurs + * @throws IllegalStateException if transport is already open + */ + @Override + public void open(HTraceConfiguration conf) throws IOException, + IllegalStateException { + if (!isOpen()) { + topic = conf.get(TOPIC_KEY, DEFAULT_TOPIC); + producer = newProducer(conf); + isOpen = true; + } else { + LOG.warn("Attempted to open an already opened transport"); + } + } + + @Override + public boolean isOpen() { + return isOpen; + } + + @Override + public void send(List<byte[]> spans) throws IOException { + + List<KeyedMessage<byte[], byte[]>> entries = new ArrayList<>(spans.size()); + + for (byte[] span : spans) { + entries.add(new KeyedMessage<byte[], byte[]>(topic, span)); + } + if (LOG.isTraceEnabled()) { + LOG.trace("sending " + entries.size() + " entries"); + } + producer.send(entries); + } + + @Override + public void close() throws IOException { + if (isOpen) { + producer.close(); + isOpen = false; + } else { + LOG.warn("Attempted to close already closed transport"); + } + } + + public Producer<byte[], byte[]> newProducer(HTraceConfiguration conf) { + // https://kafka.apache.org/083/configuration.html + Properties producerProps = new Properties(); + // Essential producer configurations + producerProps.put("metadata.broker.list", + conf.get("zipkin.kafka.metadata.broker.list", "localhost:9092")); + producerProps.put("request.required.acks", + conf.get("zipkin.kafka.request.required.acks", "0")); + producerProps.put("producer.type", + conf.get("zipkin.kafka.producer.type", "async")); + producerProps.put("serializer.class", + conf.get("zipkin.kafka.serializer.class", "kafka.serializer.DefaultEncoder")); + producerProps.put("compression.codec", + conf.get("zipkin.kafka.compression.codec", "1")); + + Producer<byte[], byte[]> producer = new Producer<>(new ProducerConfig(producerProps)); + LOG.info("Connected to Kafka transport \n" + producerProps); + return producer; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/9ebdafb4/htrace-zipkin/src/main/java/org/apache/htrace/impl/ScribeTransport.java ---------------------------------------------------------------------- diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ScribeTransport.java b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ScribeTransport.java new file mode 100644 index 0000000..0fc7920 --- /dev/null +++ b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ScribeTransport.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.htrace.impl; + +import com.twitter.zipkin.gen.LogEntry; +import com.twitter.zipkin.gen.Scribe; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.htrace.Transport; +import org.apache.htrace.core.HTraceConfiguration; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class ScribeTransport implements Transport { + + /** + * this is used to tell scribe that the entries are for zipkin.. + */ + public static final String CATEGORY = "zipkin"; + + + private static final Log LOG = LogFactory.getLog(ScribeTransport.class); + /** + * Default hostname to fall back on. + */ + private static final String DEFAULT_COLLECTOR_HOSTNAME = "localhost"; + public static final String DEPRECATED_HOSTNAME_KEY = "zipkin.collector-hostname"; + public static final String HOSTNAME_KEY = "zipkin.scribe.hostname"; + + /** + * Default collector port. + */ + private static final int DEFAULT_COLLECTOR_PORT = 9410; // trace collector default port. + public static final String DEPRECATED_PORT_KEY = "zipkin.collector-port"; + public static final String PORT_KEY = "zipkin.scribe.port"; + + private Scribe.Iface scribe = null; + + @Override + public void open(HTraceConfiguration conf) throws IOException { + if (!isOpen()) { + checkDeprecation(conf, DEPRECATED_HOSTNAME_KEY, HOSTNAME_KEY); + checkDeprecation(conf, DEPRECATED_PORT_KEY, PORT_KEY); + + String collectorHostname = conf.get(HOSTNAME_KEY, + conf.get(DEPRECATED_HOSTNAME_KEY, + DEFAULT_COLLECTOR_HOSTNAME)); + int collectorPort = conf.getInt(PORT_KEY, + conf.getInt(DEPRECATED_PORT_KEY, + DEFAULT_COLLECTOR_PORT)); + scribe = newScribe(collectorHostname, collectorPort); + LOG.info("Opened transport " + collectorHostname + ":" + collectorPort); + } else { + LOG.warn("Attempted to open an already opened transport"); + } + } + + private void checkDeprecation(HTraceConfiguration conf, String deprecatedKey, + String newKey) { + if (conf.get(deprecatedKey) != null) { + LOG.warn("Configuration \"" + deprecatedKey + "\" is deprecated. Use \"" + + newKey + "\" instead."); + } + } + + @Override + public boolean isOpen() { + return scribe != null + && ((Scribe.Client) scribe).getInputProtocol().getTransport().isOpen(); + } + + /** + * The Scribe client which is used for rpc writes a list of + * LogEntry objects, so the span objects are first transformed into LogEntry objects before + * sending to the zipkin-collector. + * + * Here is a little ascii art which shows the above transformation: + * <pre> + * +------------+ +------------+ +------------+ +-----------------+ + * | HTrace Span|-->|Zipkin Span |-->| (LogEntry) | ===========> | Zipkin Collector| + * +------------+ +------------+ +------------+ (Scribe RPC) +-----------------+ + * </pre> + * @param spans to be sent. The raw bytes are being sent. + * @throws IOException + */ + @Override + public void send(List<byte[]> spans) throws IOException { + + ArrayList<LogEntry> entries = new ArrayList<LogEntry>(spans.size()); + for (byte[] span : spans) { + entries.add(new LogEntry(CATEGORY, Base64.encodeBase64String(span))); + } + + try { + if (LOG.isTraceEnabled()) { + LOG.trace("sending " + entries.size() + " entries"); + } + scribe.Log(entries); // TODO (clehene) should we instead interpret the return? + } catch (TException e) { + throw new IOException(e); + } + + } + + @Override + public void close() throws IOException { + if (scribe != null) { + ((Scribe.Client) scribe).getInputProtocol().getTransport().close(); + scribe = null; + LOG.info("Closed transport"); + } else { + LOG.warn("Attempted to close an already closed transport"); + } + } + + private Scribe.Iface newScribe(String collectorHostname, + int collectorPort) + throws IOException { + + TTransport transport = new TFramedTransport( + new TSocket(collectorHostname, collectorPort)); + try { + transport.open(); + } catch (TTransportException e) { + throw new IOException(e); + } + TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory(); + TProtocol protocol = factory.getProtocol(transport); + return new Scribe.Client(protocol); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/9ebdafb4/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java index c106fa8..2dfe5a6 100644 --- a/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java +++ b/htrace-zipkin/src/main/java/org/apache/htrace/impl/ZipkinSpanReceiver.java @@ -17,12 +17,10 @@ package org.apache.htrace.impl; -import com.twitter.zipkin.gen.LogEntry; -import com.twitter.zipkin.gen.Scribe; - -import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.htrace.Transport; import org.apache.htrace.core.HTraceConfiguration; import org.apache.htrace.core.Span; import org.apache.htrace.core.SpanReceiver; @@ -30,27 +28,25 @@ import org.apache.htrace.zipkin.HTraceToZipkinConverter; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.protocol.TProtocolFactory; -import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TIOStreamTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; /** * Zipkin is an open source tracing library. This span receiver acts as a bridge between HTrace and @@ -58,21 +54,14 @@ import java.util.concurrent.atomic.AtomicBoolean; * <p/> * HTrace spans are queued into a blocking queue. From there background worker threads will * batch the spans together and then send them through to a Zipkin collector. + * + * Pluggable Zipkin transports are supported through the "zipkin.transport.class" configuration + * Implementations for Scribe (ScribeTransport) (default) and Kafka (KafkaTransport) are available + * */ public class ZipkinSpanReceiver extends SpanReceiver { - private static final Log LOG = LogFactory.getLog(ZipkinSpanReceiver.class); - /** - * Default hostname to fall back on. - */ - private static final String DEFAULT_COLLECTOR_HOSTNAME = "localhost"; - public static final String HOSTNAME_KEY = "zipkin.collector-hostname"; - - /** - * Default collector port. - */ - private static final int DEFAULT_COLLECTOR_PORT = 9410; // trace collector default port. - public static final String PORT_KEY = "zipkin.collector-port"; + private static final Log LOG = LogFactory.getLog(ZipkinSpanReceiver.class); /** * Default number of threads to use. @@ -81,11 +70,6 @@ public class ZipkinSpanReceiver extends SpanReceiver { public static final String NUM_THREAD_KEY = "zipkin.num-threads"; /** - * this is used to tell scribe that the entries are for zipkin.. - */ - private static final String CATEGORY = "zipkin"; - - /** * Whether the service which is traced is in client or a server mode. It is used while creating * the Endpoint. */ @@ -106,6 +90,14 @@ public class ZipkinSpanReceiver extends SpanReceiver { */ private static final int MAX_ERRORS = 10; + private static final String DEFAULT_TRANSPORT_CLASS = "org.apache.htrace.impl.ScribeTransport"; + public static final String TRANSPORT_CLASS_KEY = "zipkin.transport.class"; + + /** + * The transport that the spans will be sent trough + */ + private Transport transport; + /** * The queue that will get all HTrace spans that are to be sent. */ @@ -146,20 +138,37 @@ public class ZipkinSpanReceiver extends SpanReceiver { private HTraceToZipkinConverter converter; private ExecutorService service; private HTraceConfiguration conf; - private String collectorHostname; - private int collectorPort; public ZipkinSpanReceiver(HTraceConfiguration conf) { + this.transport = createTransport(conf); this.queue = new ArrayBlockingQueue<Span>(1000); this.protocolFactory = new TBinaryProtocol.Factory(); configure(conf); } + private void logAndThrow(Throwable exception) { + LOG.error(ExceptionUtils.getStackTrace(exception)); + throw new RuntimeException(exception); + } + + protected Transport createTransport(HTraceConfiguration conf) { + ClassLoader classLoader = Builder.class.getClassLoader(); + String className = conf.get(TRANSPORT_CLASS_KEY, DEFAULT_TRANSPORT_CLASS); + Transport transport = null; + try { + Class<?> cls = classLoader.loadClass(className); + transport = (Transport)cls.newInstance(); + } catch (ClassNotFoundException + | InstantiationException + | IllegalAccessException e) { + logAndThrow(e); + } + return transport; + } + private void configure(HTraceConfiguration conf) { this.conf = conf; - this.collectorHostname = conf.get(HOSTNAME_KEY, DEFAULT_COLLECTOR_HOSTNAME); - this.collectorPort = conf.getInt(PORT_KEY, DEFAULT_COLLECTOR_PORT); // initialize the endpoint. This endpoint is used while writing the Span. initConverter(); @@ -186,26 +195,23 @@ public class ZipkinSpanReceiver extends SpanReceiver { InetAddress tracedServiceHostname = null; // Try and get the hostname. If it's not configured try and get the local hostname. try { + //TODO (clehene) extract conf to constant + //TODO (clehene) has this been deprecated? String host = conf.get("zipkin.traced-service-hostname", InetAddress.getLocalHost().getHostAddress()); - tracedServiceHostname = InetAddress.getByName(host); } catch (UnknownHostException e) { LOG.error("Couldn't get the localHost address", e); } short tracedServicePort = (short) conf.getInt("zipkin.traced-service-port", -1); byte[] address = tracedServiceHostname != null - ? tracedServiceHostname.getAddress() : DEFAULT_COLLECTOR_HOSTNAME.getBytes(); + ? tracedServiceHostname.getAddress() : InetAddress.getLoopbackAddress().getAddress(); int ipv4 = ByteBuffer.wrap(address).getInt(); this.converter = new HTraceToZipkinConverter(ipv4, tracedServicePort); } private class WriteSpanRunnable implements Runnable { - /** - * scribe client to push zipkin spans - */ - private Scribe.Iface scribe = null; private final ByteArrayOutputStream baos; private final TProtocol streamProtocol; @@ -215,16 +221,15 @@ public class ZipkinSpanReceiver extends SpanReceiver { } /** - * This runnable converts a HTrace span to a Zipkin span and sends it across the zipkin - * collector as a thrift object. The scribe client which is used for rpc writes a list of - * LogEntry objects, so the span objects are first transformed into LogEntry objects before - * sending to the zipkin-collector. + * + * This runnable converts an HTrace span to a Zipkin span and sends it across the transport + * as a Thrift object. * <p/> * Here is a little ascii art which shows the above transformation: * <pre> - * +------------+ +------------+ +------------+ +-----------------+ - * | HTrace Span|-->|Zipkin Span |-->| (LogEntry) | ===========> | Zipkin Collector| - * +------------+ +------------+ +------------+ (Scribe rpc) +-----------------+ + * +------------+ +------------+ +-----------------+ + * | HTrace Span|-->|Zipkin Span | ===========> | Zipkin Collector| + * +------------+ +------------+ (transport) +-----------------+ * </pre> */ @Override @@ -236,6 +241,7 @@ public class ZipkinSpanReceiver extends SpanReceiver { while (running.get() || queue.size() > 0) { Span firstSpan = null; + //TODO (clenene) the following code (try / catch) is duplicated in / from FlumeSpanReceiver try { // Block for up to a second. to try and get a span. // We only block for a little bit in order to notice if the running value has changed @@ -256,13 +262,17 @@ public class ZipkinSpanReceiver extends SpanReceiver { if (dequeuedSpans.isEmpty()) continue; - // If this is the first time through or there was an error re-connect - if (scribe == null) { - startClient(); + if (!transport.isOpen()) { + try { + transport.open(conf); + } catch (Throwable e) { + logAndThrow(e); + } } + // Create a new list every time through so that the list doesn't change underneath // thrift as it's sending. - List<LogEntry> entries = new ArrayList<LogEntry>(dequeuedSpans.size()); + List<byte[]> entries = new ArrayList<>(dequeuedSpans.size()); try { // Convert every de-queued span for (Span htraceSpan : dequeuedSpans) { @@ -273,76 +283,57 @@ public class ZipkinSpanReceiver extends SpanReceiver { // Write the span to a BAOS zipkinSpan.write(streamProtocol); - // Do Base64 encoding and put the string into a log entry. - LogEntry logEntry = - new LogEntry(CATEGORY, Base64.encodeBase64String(baos.toByteArray())); - entries.add(logEntry); + entries.add(baos.toByteArray()); } // Send the entries - scribe.Log(entries); + transport.send(entries); + // clear the list for the next time through. dequeuedSpans.clear(); // reset the error counter. errorCount = 0; } catch (Exception e) { - LOG.error("Error when writing to the zipkin collector: " + - collectorHostname + ":" + collectorPort, e); - - errorCount += 1; - // If there have been ten errors in a row start dropping things. - if (errorCount < MAX_ERRORS) { - try { - queue.addAll(dequeuedSpans); - } catch (IllegalStateException ex) { - LOG.error("Drop " + dequeuedSpans.size() + " span(s) because queue is full"); - } - } - - closeClient(); - try { - // Since there was an error sleep just a little bit to try and allow the - // zipkin collector some time to recover. - Thread.sleep(500); - } catch (InterruptedException e1) { - // Ignored - } + errorCount = handleException(dequeuedSpans, errorCount, e); } } closeClient(); } - /** - * Close out the connection. - */ - private void closeClient() { - // close out the transport. - if (scribe != null && scribe instanceof Scribe.Client) { - ((Scribe.Client) scribe).getInputProtocol().getTransport().close(); - scribe = null; + private long handleException(List<Span> dequeuedSpans, long errorCount, Exception e) { + LOG.error("Error when writing to the zipkin transport: " + transport, e); + + errorCount += 1; + // If there have been ten errors in a row start dropping things. + if (errorCount < MAX_ERRORS) { + try { + queue.addAll(dequeuedSpans); + } catch (IllegalStateException ex) { + LOG.error("Drop " + dequeuedSpans.size() + " span(s) because queue is full"); + } + } + closeClient(); + try { + // Since there was an error sleep just a little bit to try and allow the + // zipkin collector some time to recover. + Thread.sleep(500); + } catch (InterruptedException e1) { + // Ignored } + return errorCount; } /** - * Re-connect to Zipkin. + * Close out the connection. */ - private void startClient() { - if (this.scribe == null) { - this.scribe = newScribe(); + private void closeClient(){ + try { + transport.close(); + } catch (IOException e) { + LOG.warn("Failed to close transport", e); } } - } - // Override for testing - Scribe.Iface newScribe() { - TTransport transport = new TFramedTransport(new TSocket(collectorHostname, collectorPort)); - try { - transport.open(); - } catch (TTransportException e) { - e.printStackTrace(); - } - TProtocol protocol = protocolFactory.getProtocol(transport); - return new Scribe.Client(protocol); } /** http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/9ebdafb4/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java ---------------------------------------------------------------------- diff --git a/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java b/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java deleted file mode 100644 index f166d35..0000000 --- a/htrace-zipkin/src/test/java/org/apache/htrace/TestHTraceSpanToZipkinSpan.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.htrace.zipkin; - -import com.twitter.zipkin.gen.zipkinCoreConstants; - -import org.apache.htrace.core.HTraceConfiguration; -import org.apache.htrace.core.MilliSpan; -import org.apache.htrace.core.POJOSpanReceiver; -import org.apache.htrace.core.Span; -import org.apache.htrace.core.SpanId; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collection; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Creates HTrace and then convert it to Zipkin trace and checks whether it is a valid span or not. - */ -public class TestHTraceSpanToZipkinSpan { - private static final String ROOT_SPAN_DESC = "ROOT"; - - @Test - public void testHTraceToZipkin() throws IOException { - Span rootSpan = new MilliSpan.Builder(). - description(ROOT_SPAN_DESC). - parents(new SpanId[] { } ). - spanId(new SpanId(100, 100)). - tracerId("test"). - begin(System.currentTimeMillis()). - build(); - Span innerOne = rootSpan.child("Some good work"); - Span innerTwo = innerOne.child("Some more good work"); - innerTwo.stop(); - innerOne.stop(); - rootSpan.addKVAnnotation("foo", "bar"); - rootSpan.addTimelineAnnotation("timeline"); - rootSpan.stop(); - - for (Span s : new Span[] {rootSpan, innerOne, innerTwo}) { - com.twitter.zipkin.gen.Span zs = - new HTraceToZipkinConverter(12345, (short) 12).convert(s); - assertSpansAreEquivalent(s, zs); - } - } - - @Test - public void testHTraceAnnotationTimestamp() throws IOException, InterruptedException { - - String tracerId = "testHTraceAnnotationTimestamp"; - long startTime = System.currentTimeMillis() * 1000; - Span ms = new MilliSpan.Builder(). - description(tracerId).parents(new SpanId[] { }). - spanId(new SpanId(2L, 2L)). - tracerId(tracerId). - begin(System.currentTimeMillis()). - build(); - - Thread.sleep(500); - long annoStartTime = System.currentTimeMillis() * 1000; - Thread.sleep(500); - ms.addTimelineAnnotation("anno"); - Thread.sleep(500); - long annoEndTime = System.currentTimeMillis() * 1000; - Thread.sleep(500); - ms.stop(); - long endTime = System.currentTimeMillis() * 1000; - - - - com.twitter.zipkin.gen.Span zs = new HTraceToZipkinConverter(12345, (short) -1).convert(ms); - - // Check to make sure that all times are in the proper order. - for (com.twitter.zipkin.gen.Annotation annotation : zs.getAnnotations()) { - // CS and SR should be before the annotation - // the annotation should be in between annotationStart and annotationEnd times - // SS and CR should be after annotationEnd and before endtime. - if (annotation.getValue().equals(zipkinCoreConstants.CLIENT_SEND) - || annotation.getValue().equals(zipkinCoreConstants.SERVER_RECV)) { - assertTrue(startTime <= annotation.getTimestamp()); - assertTrue(annotation.getTimestamp() <= annoStartTime); - } else if (annotation.getValue().equals(zipkinCoreConstants.CLIENT_RECV) - || annotation.getValue().equals(zipkinCoreConstants.SERVER_SEND)) { - assertTrue(annoEndTime <= annotation.getTimestamp()); - assertTrue(annotation.getTimestamp() <= endTime); - } else { - assertTrue(annoStartTime <= annotation.getTimestamp()); - assertTrue(annotation.getTimestamp() <= annoEndTime); - assertTrue(annotation.getTimestamp() <= endTime); - } - } - } - - @Test - public void testHTraceDefaultPort() throws IOException { - MilliSpan ms = new MilliSpan.Builder().description("test"). - parents(new SpanId[] { new SpanId(2L, 2L) }). - spanId(new SpanId(2L, 3L)). - tracerId("hmaster"). - begin(System.currentTimeMillis()). - build(); - com.twitter.zipkin.gen.Span zs = new HTraceToZipkinConverter(12345, (short) -1).convert(ms); - for (com.twitter.zipkin.gen.Annotation annotation:zs.getAnnotations()) { - assertEquals((short)60000, annotation.getHost().getPort()); - } - - // make sure it's all lower cased - ms = new MilliSpan.Builder().description("test"). - parents(new SpanId[] {new SpanId(2, 2)}). - spanId(new SpanId(2, 3)). - tracerId("HregIonServer"). - begin(System.currentTimeMillis()). - build(); - zs = new HTraceToZipkinConverter(12345, (short) -1).convert(ms); - for (com.twitter.zipkin.gen.Annotation annotation:zs.getAnnotations()) { - assertEquals((short)60020, annotation.getHost().getPort()); - } - } - - private void assertSpansAreEquivalent(Span s, com.twitter.zipkin.gen.Span zs) { - assertTrue("zipkin doesn't support multiple parents to a single span.", - s.getParents().length <= 1); - if (s.getParents().length == 1) { - assertEquals(s.getParents()[0].getLow(), zs.getParent_id()); - } - assertEquals(s.getSpanId().getLow(), zs.getId()); - Assert.assertNotNull(zs.getAnnotations()); - if (ROOT_SPAN_DESC.equals(zs.getName())) { - assertEquals(5, zs.getAnnotations().size());// two start, two stop + one timeline annotation - assertEquals(1, zs.getBinary_annotations().size()); - } else { - assertEquals(4, zs.getAnnotations().size()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/9ebdafb4/htrace-zipkin/src/test/java/org/apache/htrace/impl/TestZipkinSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-zipkin/src/test/java/org/apache/htrace/impl/TestZipkinSpanReceiver.java b/htrace-zipkin/src/test/java/org/apache/htrace/impl/TestZipkinSpanReceiver.java index 6595772..0a22d68 100644 --- a/htrace-zipkin/src/test/java/org/apache/htrace/impl/TestZipkinSpanReceiver.java +++ b/htrace-zipkin/src/test/java/org/apache/htrace/impl/TestZipkinSpanReceiver.java @@ -17,14 +17,7 @@ package org.apache.htrace.impl; -import com.twitter.zipkin.gen.LogEntry; -import com.twitter.zipkin.gen.ResultCode; -import com.twitter.zipkin.gen.Scribe; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import org.apache.commons.codec.binary.Base64; +import org.apache.htrace.Transport; import org.apache.htrace.core.AlwaysSampler; import org.apache.htrace.core.HTraceConfiguration; import org.apache.htrace.core.MilliSpan; @@ -40,17 +33,22 @@ import org.apache.thrift.transport.TMemoryBuffer; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + public class TestZipkinSpanReceiver { - private Tracer newTracer(final Scribe.Iface scribe) { + private Tracer newTracer(final Transport transport) { TracerPool pool = new TracerPool("newTracer"); pool.addReceiver(new ZipkinSpanReceiver(HTraceConfiguration.EMPTY) { - @Override Scribe.Iface newScribe() { - return scribe; + @Override + protected Transport createTransport(HTraceConfiguration conf) { + return transport; } }); - return new Tracer.Builder(). - name("ZipkinTracer"). + return new Tracer.Builder("ZipkinTracer"). tracerPool(pool). conf(HTraceConfiguration.fromKeyValuePairs( "sampler.classes", AlwaysSampler.class.getName() @@ -60,8 +58,8 @@ public class TestZipkinSpanReceiver { @Test public void testSimpleTraces() throws IOException, InterruptedException { - FakeZipkinScribe scribe = new FakeZipkinScribe(); - Tracer tracer = newTracer(scribe); + FakeZipkinTransport transport = new FakeZipkinTransport(); + Tracer tracer = newTracer(transport); Span rootSpan = new MilliSpan.Builder(). description("root"). spanId(new SpanId(100, 100)). @@ -72,43 +70,26 @@ public class TestZipkinSpanReceiver { TraceScope innerOne = tracer.newScope("innerOne"); TraceScope innerTwo = tracer.newScope("innerTwo"); innerTwo.close(); - Assert.assertTrue(scribe.nextMessageAsSpan().getName().contains("innerTwo")); + Assert.assertTrue(transport.nextMessageAsSpan().getName().contains("innerTwo")); innerOne.close(); - Assert.assertTrue(scribe.nextMessageAsSpan().getName().contains("innerOne")); + Assert.assertTrue(transport.nextMessageAsSpan().getName().contains("innerOne")); rootSpan.addKVAnnotation("foo", "bar"); rootSpan.addTimelineAnnotation("timeline"); rootScope.close(); - Assert.assertTrue(scribe.nextMessageAsSpan().getName().contains("root")); + Assert.assertTrue(transport.nextMessageAsSpan().getName().contains("root")); tracer.close(); } @Test public void testConcurrency() throws IOException { - Scribe.Iface alwaysOk = new Scribe.Iface() { - @Override - public ResultCode Log(List<LogEntry> messages) throws TException { - return ResultCode.OK; - } - }; - Tracer tracer = newTracer(alwaysOk); - TraceCreator traceCreator = new TraceCreator(tracer); - traceCreator.createThreadedTrace(); - } - - @Test - public void testResilience() throws IOException { - Scribe.Iface alwaysTryLater = new Scribe.Iface() { - @Override - public ResultCode Log(List<LogEntry> messages) throws TException { - return ResultCode.TRY_LATER; - } - }; - Tracer tracer = newTracer(alwaysTryLater); + Tracer tracer = newTracer(new FakeZipkinTransport(){ + @Override public void send(List<byte[]> spans) throws IOException { /*do nothing*/ } + }); TraceCreator traceCreator = new TraceCreator(tracer); traceCreator.createThreadedTrace(); } - private static class FakeZipkinScribe implements Scribe.Iface { + private static class FakeZipkinTransport implements Transport { private final BlockingQueue<com.twitter.zipkin.gen.Span> receivedSpans = new ArrayBlockingQueue<com.twitter.zipkin.gen.Span>(1); @@ -117,19 +98,35 @@ public class TestZipkinSpanReceiver { return receivedSpans.take(); } + + @Override + public void open(HTraceConfiguration conf) throws IOException { + + } + + @Override + public boolean isOpen() { + return false; + } + @Override - public ResultCode Log(List<LogEntry> messages) throws TException { - for (LogEntry message : messages) { - Assert.assertEquals("zipkin", message.category); - byte[] bytes = Base64.decodeBase64(message.message); - - TMemoryBuffer transport = new TMemoryBuffer(bytes.length); - transport.write(bytes); - com.twitter.zipkin.gen.Span zSpan = new com.twitter.zipkin.gen.Span(); - zSpan.read(new TBinaryProtocol(transport)); - receivedSpans.add(zSpan); + public void send(List<byte[]> spans) throws IOException { + for (byte[] message : spans) { + TMemoryBuffer transport = new TMemoryBuffer(message.length); + try { + transport.write(message); + com.twitter.zipkin.gen.Span zSpan = new com.twitter.zipkin.gen.Span(); + zSpan.read(new TBinaryProtocol(transport)); + receivedSpans.add(zSpan); + } catch (TException e) { + throw new IOException(e); + } } - return ResultCode.OK; + } + + @Override + public void close() throws IOException { + } } } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/9ebdafb4/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/ITZipkinReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/ITZipkinReceiver.java b/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/ITZipkinReceiver.java new file mode 100644 index 0000000..cb50032 --- /dev/null +++ b/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/ITZipkinReceiver.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.htrace.zipkin; + + +import com.twitter.zipkin.gen.Span; + +import org.I0Itec.zkclient.ZkClient; +import org.apache.htrace.core.HTraceConfiguration; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; +import org.apache.htrace.core.TracerPool; +import org.apache.htrace.impl.KafkaTransport; +import org.apache.htrace.impl.ZipkinSpanReceiver; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.MockTime; +import kafka.utils.TestUtils; +import kafka.utils.TestZKUtils; +import kafka.utils.ZKStringSerializer$; +import kafka.zk.EmbeddedZookeeper; +import scala.collection.JavaConversions; +import scala.collection.mutable.Buffer; + +public class ITZipkinReceiver { + + @Test + public void testKafkaTransport() throws Exception { + + String topic = "zipkin"; + // Kafka setup + EmbeddedZookeeper zkServer = new EmbeddedZookeeper(TestZKUtils.zookeeperConnect()); + ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$); + Properties props = TestUtils.createBrokerConfig(0, TestUtils.choosePort(), false); + KafkaConfig config = new KafkaConfig(props); + KafkaServer kafkaServer = TestUtils.createServer(config, new MockTime()); + + Buffer<KafkaServer> servers = JavaConversions.asScalaBuffer(Collections.singletonList(kafkaServer)); + TestUtils.createTopic(zkClient, topic, 1, 1, servers, new Properties()); + zkClient.close(); + TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0, 5000); + + // HTrace + HTraceConfiguration hTraceConfiguration = HTraceConfiguration.fromKeyValuePairs( + "sampler.classes", "AlwaysSampler", + "span.receiver.classes", ZipkinSpanReceiver.class.getName(), + "zipkin.kafka.metadata.broker.list", config.advertisedHostName() + ":" + config.advertisedPort(), + "zipkin.kafka.topic", topic, + ZipkinSpanReceiver.TRANSPORT_CLASS_KEY, KafkaTransport.class.getName() + ); + + final Tracer tracer = new Tracer.Builder("test-tracer") + .tracerPool(new TracerPool("test-tracer-pool")) + .conf(hTraceConfiguration) + .build(); + + String scopeName = "test-kafka-transport-scope"; + TraceScope traceScope = tracer.newScope(scopeName); + traceScope.close(); + tracer.close(); + + // Kafka consumer + Properties consumerProps = new Properties(); + consumerProps.put("zookeeper.connect", props.getProperty("zookeeper.connect")); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testing.group"); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest"); + ConsumerConnector connector = + kafka.consumer.Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(consumerProps)); + Map<String, Integer> topicCountMap = new HashMap<>(); + topicCountMap.put(topic, 1); + Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topicCountMap); + ConsumerIterator<byte[], byte[]> it = streams.get(topic).get(0).iterator(); + + // Test + Assert.assertTrue("We should have one message in Kafka", it.hasNext()); + Span span = new Span(); + new TDeserializer(new TBinaryProtocol.Factory()).deserialize(span, it.next().message()); + Assert.assertEquals("The span name should match our scope description", span.getName(), scopeName); + + kafkaServer.shutdown(); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/9ebdafb4/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/TestHTraceSpanToZipkinSpan.java ---------------------------------------------------------------------- diff --git a/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/TestHTraceSpanToZipkinSpan.java b/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/TestHTraceSpanToZipkinSpan.java new file mode 100644 index 0000000..f166d35 --- /dev/null +++ b/htrace-zipkin/src/test/java/org/apache/htrace/zipkin/TestHTraceSpanToZipkinSpan.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.htrace.zipkin; + +import com.twitter.zipkin.gen.zipkinCoreConstants; + +import org.apache.htrace.core.HTraceConfiguration; +import org.apache.htrace.core.MilliSpan; +import org.apache.htrace.core.POJOSpanReceiver; +import org.apache.htrace.core.Span; +import org.apache.htrace.core.SpanId; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Creates HTrace and then convert it to Zipkin trace and checks whether it is a valid span or not. + */ +public class TestHTraceSpanToZipkinSpan { + private static final String ROOT_SPAN_DESC = "ROOT"; + + @Test + public void testHTraceToZipkin() throws IOException { + Span rootSpan = new MilliSpan.Builder(). + description(ROOT_SPAN_DESC). + parents(new SpanId[] { } ). + spanId(new SpanId(100, 100)). + tracerId("test"). + begin(System.currentTimeMillis()). + build(); + Span innerOne = rootSpan.child("Some good work"); + Span innerTwo = innerOne.child("Some more good work"); + innerTwo.stop(); + innerOne.stop(); + rootSpan.addKVAnnotation("foo", "bar"); + rootSpan.addTimelineAnnotation("timeline"); + rootSpan.stop(); + + for (Span s : new Span[] {rootSpan, innerOne, innerTwo}) { + com.twitter.zipkin.gen.Span zs = + new HTraceToZipkinConverter(12345, (short) 12).convert(s); + assertSpansAreEquivalent(s, zs); + } + } + + @Test + public void testHTraceAnnotationTimestamp() throws IOException, InterruptedException { + + String tracerId = "testHTraceAnnotationTimestamp"; + long startTime = System.currentTimeMillis() * 1000; + Span ms = new MilliSpan.Builder(). + description(tracerId).parents(new SpanId[] { }). + spanId(new SpanId(2L, 2L)). + tracerId(tracerId). + begin(System.currentTimeMillis()). + build(); + + Thread.sleep(500); + long annoStartTime = System.currentTimeMillis() * 1000; + Thread.sleep(500); + ms.addTimelineAnnotation("anno"); + Thread.sleep(500); + long annoEndTime = System.currentTimeMillis() * 1000; + Thread.sleep(500); + ms.stop(); + long endTime = System.currentTimeMillis() * 1000; + + + + com.twitter.zipkin.gen.Span zs = new HTraceToZipkinConverter(12345, (short) -1).convert(ms); + + // Check to make sure that all times are in the proper order. + for (com.twitter.zipkin.gen.Annotation annotation : zs.getAnnotations()) { + // CS and SR should be before the annotation + // the annotation should be in between annotationStart and annotationEnd times + // SS and CR should be after annotationEnd and before endtime. + if (annotation.getValue().equals(zipkinCoreConstants.CLIENT_SEND) + || annotation.getValue().equals(zipkinCoreConstants.SERVER_RECV)) { + assertTrue(startTime <= annotation.getTimestamp()); + assertTrue(annotation.getTimestamp() <= annoStartTime); + } else if (annotation.getValue().equals(zipkinCoreConstants.CLIENT_RECV) + || annotation.getValue().equals(zipkinCoreConstants.SERVER_SEND)) { + assertTrue(annoEndTime <= annotation.getTimestamp()); + assertTrue(annotation.getTimestamp() <= endTime); + } else { + assertTrue(annoStartTime <= annotation.getTimestamp()); + assertTrue(annotation.getTimestamp() <= annoEndTime); + assertTrue(annotation.getTimestamp() <= endTime); + } + } + } + + @Test + public void testHTraceDefaultPort() throws IOException { + MilliSpan ms = new MilliSpan.Builder().description("test"). + parents(new SpanId[] { new SpanId(2L, 2L) }). + spanId(new SpanId(2L, 3L)). + tracerId("hmaster"). + begin(System.currentTimeMillis()). + build(); + com.twitter.zipkin.gen.Span zs = new HTraceToZipkinConverter(12345, (short) -1).convert(ms); + for (com.twitter.zipkin.gen.Annotation annotation:zs.getAnnotations()) { + assertEquals((short)60000, annotation.getHost().getPort()); + } + + // make sure it's all lower cased + ms = new MilliSpan.Builder().description("test"). + parents(new SpanId[] {new SpanId(2, 2)}). + spanId(new SpanId(2, 3)). + tracerId("HregIonServer"). + begin(System.currentTimeMillis()). + build(); + zs = new HTraceToZipkinConverter(12345, (short) -1).convert(ms); + for (com.twitter.zipkin.gen.Annotation annotation:zs.getAnnotations()) { + assertEquals((short)60020, annotation.getHost().getPort()); + } + } + + private void assertSpansAreEquivalent(Span s, com.twitter.zipkin.gen.Span zs) { + assertTrue("zipkin doesn't support multiple parents to a single span.", + s.getParents().length <= 1); + if (s.getParents().length == 1) { + assertEquals(s.getParents()[0].getLow(), zs.getParent_id()); + } + assertEquals(s.getSpanId().getLow(), zs.getId()); + Assert.assertNotNull(zs.getAnnotations()); + if (ROOT_SPAN_DESC.equals(zs.getName())) { + assertEquals(5, zs.getAnnotations().size());// two start, two stop + one timeline annotation + assertEquals(1, zs.getBinary_annotations().size()); + } else { + assertEquals(4, zs.getAnnotations().size()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/9ebdafb4/htrace-zipkin/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/htrace-zipkin/src/test/resources/log4j.properties b/htrace-zipkin/src/test/resources/log4j.properties new file mode 100644 index 0000000..564a77a --- /dev/null +++ b/htrace-zipkin/src/test/resources/log4j.properties @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# By default, everything goes to console and file +log4j.rootLogger=WARN, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n +log4j.appender.A1.ImmediateFlush=true + + +log4j.logger.kafka.utils=WARN, A1 +log4j.logger.kafka.consumer=WARN, A1 +log4j.logger.kafka.producer=WARN, A1 + +log4j.logger.org.apache.htrace=INFO, A1 \ No newline at end of file
