Repository: incubator-htrace Updated Branches: refs/heads/master 97b4fc6f7 -> 86044c6f2
HTRACE-51. htraced java REST client (a.k.a java SpanReceiver for htraced) (stack via cmccabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/a521b558 Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/a521b558 Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/a521b558 Branch: refs/heads/master Commit: a521b5589e57f8323f8e6d2ab619614f919b1db4 Parents: 97b4fc6 Author: Colin P. Mccabe <[email protected]> Authored: Mon Jan 19 22:25:29 2015 -0800 Committer: Colin P. Mccabe <[email protected]> Committed: Mon Jan 19 22:25:29 2015 -0800 ---------------------------------------------------------------------- htrace-core/pom.xml | 12 +- htrace-htraced/pom.xml | 150 ++++++++++ .../apache/htrace/impl/HTracedRESTReceiver.java | 285 +++++++++++++++++++ pom.xml | 21 +- 4 files changed, 456 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a521b558/htrace-core/pom.xml ---------------------------------------------------------------------- diff --git a/htrace-core/pom.xml b/htrace-core/pom.xml index 5c37dc8..d3b7f1b 100644 --- a/htrace-core/pom.xml +++ b/htrace-core/pom.xml @@ -145,20 +145,18 @@ language governing permissions and limitations under the License. --> <artifactId>junit</artifactId> <scope>test</scope> </dependency> - <!-- core specific deps. --> - <dependency> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> - <version>2.4.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> - <version>2.4.0</version> + </dependency> + <!-- core specific deps. --> + <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a521b558/htrace-htraced/pom.xml ---------------------------------------------------------------------- diff --git a/htrace-htraced/pom.xml b/htrace-htraced/pom.xml new file mode 100644 index 0000000..de0600c --- /dev/null +++ b/htrace-htraced/pom.xml @@ -0,0 +1,150 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor +license agreements. See the NOTICE file distributed with this work for additional +information regarding copyright ownership. The ASF licenses this file to +You under the Apache License, Version 2.0 (the "License"); you may not use +this file except in compliance with the License. You may obtain a copy of +the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required +by applicable law or agreed to in writing, software distributed under the +License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS +OF ANY KIND, either express or implied. See the License for the specific +language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>htrace-htraced</artifactId> + <packaging>jar</packaging> + + <parent> + <artifactId>htrace</artifactId> + <groupId>org.apache.htrace</groupId> + <version>3.1.0-incubating</version> + <relativePath>..</relativePath> + </parent> + + <name>htrace-htraced</name> + <description>HTraced and HTraced clients</description> + <url>http://incubator.apache.org/projects/htrace.html</url> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <build> + <plugins> + <plugin> + <!--Make it so assembly:single does nothing in here--> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <skipAssembly>true</skipAssembly> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-javadoc-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-gpg-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + </plugin> + <plugin> + <!-- explicitly define maven-deploy-plugin after other to force exec order --> + <artifactId>maven-deploy-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + </plugin> + <!--Move this to top-level. These shade patterns are common across components + --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <configuration> + <relocations> + <relocation> + <pattern>org.apache.commons.logging</pattern> + <shadedPattern>org.apache.htrace.commons.logging</shadedPattern> + </relocation> + <relocation> + <pattern>com.fasterxml.jackson</pattern> + <shadedPattern>org.apache.htrace.fasterxml.jackson</shadedPattern> + </relocation> + <relocation> + <pattern>org.eclipse.jetty</pattern> + <shadedPattern>org.apache.htrace.jetty</shadedPattern> + </relocation> + </relocations> + </configuration> + <goals> + <goal>shade</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + <dependencies> + <!-- Module deps. --> + <dependency> + <groupId>org.apache.htrace</groupId> + <artifactId>htrace-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.htrace</groupId> + <artifactId>htrace-core</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + <!-- Global deps. --> + <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + <!-- htraced rest client deps. --> + <!--Is this too much? Pulls down jetty-http, jetty-server, jetty-io + This is new-style jetty client, jetty9 and jdk7 required. + It can do async but we will use it synchronously at first. + Has nice tutorial: http://www.eclipse.org/jetty/documentation/9.1.5.v20140505/http-client-api.html + --> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-client</artifactId> + <version>9.2.6.v20141205</version> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a521b558/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java new file mode 100644 index 0000000..45e433f --- /dev/null +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.htrace.HTraceConfiguration; +import org.apache.htrace.Span; +import org.apache.htrace.SpanReceiver; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.util.StringContentProvider; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; + +/** + * A {@link SpanReceiver} that passes Spans to htraced via REST. Implementation minimizes + * dependencies and aims for small footprint since this client will be the guest of another, + * the process traced. + * + * <p>Logs via commons-logging. Uses jetty client. Jetty has its own logging (TODO: connect + * jetty logging to commons-logging, see https://issues.apache.org/jira/browse/HADOOP-6807 + * and http://blogs.bytecode.com.au/glen/2005/06/21/getting-your-logging-working-in-jetty.html). + * + * <p>This client depends on the REST defined in <code>rest.go</code> in the htraced REST server. + * For example, a <code>GET</code> on <code>/serverInfo</code> returns the htraced server info. + * + * <p>Create an instance by doing: + * <code>SpanReceiver receiver = new HTracedRESTReceiver(conf);</code> where conf is an instance + * of {@link HTraceConfiguration}. See the public keys defined below for we will look for in the + * configuration. For example, {@link #HTRACE_REST_CLIENT_HOSTNAME} and + * {@link #HTRACE_REST_CLIENT_PORT}. + * Then call <code>receiver.receiveSpan(Span);</code> to send spans to an htraced instance. + * This method returns immediately. It sends the spans in background. + * + * <p>TODO: How to be more dependent on rest.go so we break if it changes? + * TODO: Add tests. Add start/stop htraced. + * TODO: Shading works? + * TODO: Add lazy start; don't start background thread till a span gets queued. + * TODO: Add some metrics; how many times we've run, how many spans and what size we've sent. + */ +public class HTracedRESTReceiver implements SpanReceiver { + private static final Log LOG = LogFactory.getLog(HTracedRESTReceiver.class); + + // TODO: Take process name and add this to user agent? Would help debugging? + private final HttpClient httpClient; + + /** + * REST URL to use writing Spans. + */ + private final String writeSpansRESTURL; + + /** + * Runs background task to do the REST PUT. + * TODO: Make period configurable. TODO: Make instantiation lazy. + */ + private final ScheduledExecutorService scheduler; + + /** + * Keep around reference so can cancel on close any running scheduled task. + */ + private final ScheduledFuture<?> scheduledFuture; + + /** + * Timeout in milliseconds. + */ + public static final String HTRACE_REST_CLIENT_TIMEOUT = "htrace.client.rest.timeout.ms"; + private static final int HTRACE_REST_CLIENT_TIMEOUT_DEFAULT = 60000; + + /** + * Port of the remote htraced we are to talk to. + */ + public static final String HTRACE_REST_CLIENT_PORT = "htrace.client.rest.htraced.port"; + private static final int HTRACE_REST_CLIENT_PORT_DEFAULT = 9095; + + /** + * Hostname of the remote htraced we are to talk to. + */ + public static final String HTRACE_REST_CLIENT_HOSTNAME = "htrace.client.rest.htraced.hostname"; + private static final String HTRACE_REST_CLIENT_HOSTNAME_DEFAULT = "localhost"; + + /** + * Size of the queue to accumulate spans in. + * Cleared by the background thread that does the REST POST to htraced. + */ + public static final String HTRACE_REST_CLIENT_Q_CAPACITY = "htrace.client.rest.q.capacity"; + private static final int HTRACE_REST_CLIENT_Q_CAPACITY_DEFAULT = 1000000; + + /** + * Period at which the background thread that does the REST POST to htraced runs at in seconds. + */ + public static final String HTRACE_REST_CLIENT_PERIOD_SECONDS = + "htrace.client.reset.period.seconds"; + private static final int HTRACE_REST_CLIENT_PERIOD_SECONDS_DEFAULT = 1; + + /** + * Maximum spans to post to htraced at a time. + */ + public static final String HTRACE_REST_CLIENT_MAX_AT_A_TIME = + "htrace.client.rest.max.spans.at.a.time"; + private static final int HTRACE_REST_CLIENT_MAX_AT_A_TIME_DEFAULT = 1000; + + /** + * Simple bounded queue to hold spans between periodic runs of the httpclient. + * TODO: Make size configurable. + */ + private final Queue<Span> queue; + + /** + * Keep last time we logged we were at capacity; used to prevent flooding of logs with + * "at capacity" messages. + */ + private volatile long lastAtCapacityWarningLog = 0L; + + public HTracedRESTReceiver(final HTraceConfiguration conf) throws Exception { + this.httpClient = new HttpClient(); + this.httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT, + this.getClass().getSimpleName())); + // Use same timeout for connection and idle for now. + int timeout = conf.getInt(HTRACE_REST_CLIENT_TIMEOUT, HTRACE_REST_CLIENT_TIMEOUT_DEFAULT); + this.httpClient.setConnectTimeout(timeout); + this.httpClient.setIdleTimeout(timeout); + int capacity = + conf.getInt(HTRACE_REST_CLIENT_Q_CAPACITY, HTRACE_REST_CLIENT_Q_CAPACITY_DEFAULT); + this.queue = new ArrayBlockingQueue<Span>(capacity, true); + // Build up the writeSpans URL. + int port = conf.getInt(HTRACE_REST_CLIENT_PORT, HTRACE_REST_CLIENT_PORT_DEFAULT); + String hostname = conf.get(HTRACE_REST_CLIENT_HOSTNAME, HTRACE_REST_CLIENT_HOSTNAME_DEFAULT); + this.writeSpansRESTURL = "http://" + hostname + ":" + port + "/writeSpans"; + // Make a scheduler with one thread to run our POST of spans on a period. + this.scheduler = Executors.newScheduledThreadPool(1); + // Period at which we run the background thread that does the REST POST to htraced. + int periodInSeconds = + conf.getInt(HTRACE_REST_CLIENT_PERIOD_SECONDS, HTRACE_REST_CLIENT_PERIOD_SECONDS_DEFAULT); + // Maximum spans to send in one go + int maxToSendAtATime = + conf.getInt(HTRACE_REST_CLIENT_MAX_AT_A_TIME, HTRACE_REST_CLIENT_MAX_AT_A_TIME_DEFAULT); + this.scheduledFuture = + this.scheduler.scheduleAtFixedRate(new PostSpans(this.queue, maxToSendAtATime), + periodInSeconds, periodInSeconds, TimeUnit.SECONDS); + // Start up the httpclient. + this.httpClient.start(); + } + + /** + * POST spans runnable. + * Run on a period. Services the passed in queue taking spans and sending them to traced via http. + */ + private class PostSpans implements Runnable { + private final Queue<Span> q; + // TODO: Make this configurable. + private final int maxToSendAtATime; + + private PostSpans(final Queue<Span> q, final int maxToSendAtATime) { + this.q = q; + this.maxToSendAtATime = maxToSendAtATime; + } + + @Override + public void run() { + Span span = null; + // Cycle until we drain the queue. Seen maxToSendAtATime at a time if more than we can send + // in one go. + while ((span = this.q.poll()) != null) { + // We got a span. Send at least this one span. + Request request = httpClient.newRequest(writeSpansRESTURL).method(HttpMethod.POST); + request.header(HttpHeader.CONTENT_TYPE, "application/json"); + int count = 1; + request.content(new StringContentProvider(span.toJson())); + // Drain queue of spans if more than just one. + while ((span = this.q.poll()) != null) { + request.content(new StringContentProvider(span.toJson())); + count++; + // If we've accumulated sufficient to send, go ahead and send what we have. Can do the + // rest in out next go around. + if (count > this.maxToSendAtATime) break; + } + try { + ContentResponse response = request.send(); + if (response.getStatus() == HttpStatus.OK_200) { + LOG.info("POSTED " + count + " spans"); + } else { + LOG.error("Status: " + response.getStatus()); + LOG.error(response.getHeaders()); + LOG.error(response.getContentAsString()); + } + } catch (InterruptedException e) { + LOG.error(e); + } catch (TimeoutException e) { + LOG.error(e); + } catch (ExecutionException e) { + LOG.error(e); + } + } + } + } + + @Override + public void close() throws IOException { + if (this.scheduledFuture != null) this.scheduledFuture.cancel(true); + if (this.scheduler == null) this.scheduler.shutdown(); + if (this.httpClient != null) { + try { + this.httpClient.stop(); + } catch (Exception e) { + throw new IOException(e); + } + } + } + + + @Override + public void receiveSpan(Span span) { + if (!this.queue.offer(span)) { + long now = System.currentTimeMillis(); + // Only log every 5 minutes. Any more than this for a guest process is obnoxious + if (now - lastAtCapacityWarningLog > 300000) { + LOG.warn("At capacity"); + this.lastAtCapacityWarningLog = now; + } + } + } + + /** + * Exercise our little span receiver. Presumes a running htraced instance at localhost:9095. + * @param args + * @throws Exception + */ + public static void main(String[] args) throws Exception { + HTracedRESTReceiver receiver = new HTracedRESTReceiver(new HTraceConfiguration() { + @Override + public String get(String key) { + return null; + } + + @Override + public String get(String key, String defaultValue) { + return defaultValue; + } + }); + try { + // Do basic a GET /server/info against localhost:9095 htraced + ContentResponse response = receiver.httpClient.GET("http://localhost:9095/server/info"); + System.out.println(response.getMediaType()); + System.out.println(response.getContentAsString()); + // TODO: Fix MilliSpan. Requires a parentid. Shouldn't have to have one. + for (int i = 0; i < 100; i++) { + Span span = new MilliSpan.Builder().parents(new long [] {1L}).build(); + receiver.receiveSpan(span); + Thread.sleep(100); + } + } finally { + receiver.close(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/a521b558/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index bf85272..38ae4b3 100644 --- a/pom.xml +++ b/pom.xml @@ -32,6 +32,7 @@ language governing permissions and limitations under the License. --> <module>htrace-zipkin</module> <module>htrace-hbase</module> <module>htrace-flume</module> + <module>htrace-htraced</module> </modules> <licenses> @@ -209,16 +210,16 @@ language governing permissions and limitations under the License. --> </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.1</version> + </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <version>2.1</version> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-gpg-plugin</artifactId> </plugin> <plugin> @@ -306,6 +307,16 @@ language governing permissions and limitations under the License. --> <version>4.10</version> <scope>test</scope> </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>2.4.0</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>2.4.0</version> + </dependency> </dependencies> </dependencyManagement> <distributionManagement>
