Repository: incubator-htrace Updated Branches: refs/heads/master 9877e8127 -> 5911712f3
HTRACE-133. HTracedRESTReceiver drops spans when close() is called (cmccabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/5911712f Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/5911712f Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/5911712f Branch: refs/heads/master Commit: 5911712f3aed972564a28e0cf8501eda036bd4f0 Parents: 9877e81 Author: Colin P. Mccabe <[email protected]> Authored: Fri Mar 6 12:32:02 2015 -0800 Committer: Colin P. Mccabe <[email protected]> Committed: Fri Mar 6 12:32:02 2015 -0800 ---------------------------------------------------------------------- .../apache/htrace/impl/HTracedRESTReceiver.java | 95 +++++++++++++------- .../htrace/impl/TestHTracedRESTReceiver.java | 53 ++++++++--- 2 files changed, 107 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5911712f/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java index ae1cfed..7edc2b8 100644 --- a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java @@ -68,9 +68,10 @@ import org.eclipse.jetty.http.HttpStatus; public class HTracedRESTReceiver implements SpanReceiver { private static final Log LOG = LogFactory.getLog(HTracedRESTReceiver.class); - // TODO: Take process name and add this to user agent? Would help debugging? - // @VisibleForTesting Protected so accessible from tests. - final HttpClient httpClient; + /** + * The HttpClient to use for this receiver. + */ + private final HttpClient httpClient; /** * The maximum number of spans to buffer. @@ -98,11 +99,16 @@ public class HTracedRESTReceiver implements SpanReceiver { private final Thread postSpansThread; /** - * Timeout in milliseconds. - * For now, it is read and connect timeout. + * The connection timeout in milliseconds. + */ + public static final String CLIENT_CONNECT_TIMEOUT_MS_KEY = "client.connect.timeout.ms"; + private static final int CLIENT_CONNECT_TIMEOUT_MS_DEFAULT = 30000; + + /** + * The idle timeout in milliseconds. */ - public static final String CLIENT_REST_TIMEOUT_MS_KEY = "client.rest.timeout.ms"; - private static final int CLIENT_REST_TIMEOUT_MS_DEFAULT = 60000; + public static final String CLIENT_IDLE_TIMEOUT_MS_KEY = "client.idle.timeout.ms"; + private static final int CLIENT_IDLE_TIMEOUT_MS_DEFAULT = 120000; /** * URL of the htraced REST server we are to talk to. @@ -164,19 +170,32 @@ public class HTracedRESTReceiver implements SpanReceiver { private boolean mustStartFlush; /** + * Create an HttpClient instance. + * + * @param connTimeout The timeout to use for connecting. + * @param idleTimeout The idle timeout to use. + */ + HttpClient createHttpClient(long connTimeout, long idleTimeout) { + HttpClient httpClient = new HttpClient(); + httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT, + this.getClass().getSimpleName())); + httpClient.setConnectTimeout(connTimeout); + httpClient.setIdleTimeout(idleTimeout); + return httpClient; + } + + /** * Constructor. * You must call {@link #close()} post construction when done. * @param conf * @throws Exception */ public HTracedRESTReceiver(final HTraceConfiguration conf) throws Exception { - this.httpClient = new HttpClient(); - this.httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT, - this.getClass().getSimpleName())); - // Use same timeout for connection and idle for now. - int timeout = conf.getInt(CLIENT_REST_TIMEOUT_MS_KEY, CLIENT_REST_TIMEOUT_MS_DEFAULT); - this.httpClient.setConnectTimeout(timeout); - this.httpClient.setIdleTimeout(timeout); + int connTimeout = conf.getInt(CLIENT_CONNECT_TIMEOUT_MS_KEY, + CLIENT_CONNECT_TIMEOUT_MS_DEFAULT); + int idleTimeout = conf.getInt(CLIENT_IDLE_TIMEOUT_MS_KEY, + CLIENT_IDLE_TIMEOUT_MS_DEFAULT); + this.httpClient = createHttpClient(connTimeout, idleTimeout); this.capacity = conf.getInt(CLIENT_REST_QUEUE_CAPACITY_KEY, CLIENT_REST_QUEUE_CAPACITY_DEFAULT); this.spans = new ArrayDeque<Span>(capacity); // Build up the writeSpans URL. @@ -197,9 +216,10 @@ public class HTracedRESTReceiver implements SpanReceiver { this.postSpansThread.setName("PostSpans"); this.postSpansThread.start(); if (LOG.isDebugEnabled()) { - LOG.debug("Created new HTracedRESTReceiver with timeout=" + timeout + - ", capacity=" + capacity + ", url=" + url + ", periodInMs=" + - periodInMs + ", maxToSendAtATime=" + maxToSendAtATime); + LOG.debug("Created new HTracedRESTReceiver with connTimeout=" + + connTimeout + ", idleTimeout = " + idleTimeout + ", capacity=" + + capacity + ", url=" + url + ", periodInMs=" + periodInMs + + ", maxToSendAtATime=" + maxToSendAtATime); } } @@ -239,20 +259,24 @@ public class HTracedRESTReceiver implements SpanReceiver { lock.lock(); try { if (shutdown) { - LOG.info("Shutting down PostSpans thread..."); - break; - } - try { - waitNs = cond.awaitNanos(waitNs); - if (mustStartFlush) { + if (spans.isEmpty()) { + LOG.debug("Shutting down PostSpans thread..."); + break; + } + } else { + try { + waitNs = cond.awaitNanos(waitNs); + if (mustStartFlush) { + waitNs = 0; + mustStartFlush = false; + } + } catch (InterruptedException e) { + LOG.info("Got InterruptedException"); waitNs = 0; - mustStartFlush = false; } - } catch (InterruptedException e) { - LOG.info("Got InterruptedException"); - waitNs = 0; } - if ((spans.size() > maxToSendAtATime) || (waitNs <= 0)) { + if ((spans.size() > maxToSendAtATime) || (waitNs <= 0) || + shutdown) { loadSpanBuf(); waitNs = periodInNs; } @@ -319,7 +343,7 @@ public class HTracedRESTReceiver implements SpanReceiver { @Override public void close() throws IOException { - LOG.info("Closing HTracedRESTReceiver(" + url + ")."); + LOG.debug("Closing HTracedRESTReceiver(" + url + ")."); lock.lock(); try { this.shutdown = true; @@ -328,7 +352,13 @@ public class HTracedRESTReceiver implements SpanReceiver { lock.unlock(); } try { - postSpansThread.join(30000); + postSpansThread.join(120000); + if (postSpansThread.isAlive()) { + LOG.error("Timed out without closing HTracedRESTReceiver(" + + url + ")."); + } else { + LOG.debug("Closed HTracedRESTReceiver(" + url + ")."); + } } catch (InterruptedException e) { LOG.error("Interrupted while joining postSpans", e); } @@ -364,6 +394,11 @@ public class HTracedRESTReceiver implements SpanReceiver { boolean added = false; lock.lock(); try { + if (shutdown) { + LOG.trace("receiveSpan(span=" + span + "): HTracedRESTReceiver " + + "is already shut down."); + return; + } if (spans.size() < capacity) { spans.add(span); added = true; http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5911712f/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java index 676e348..eca6d6d 100644 --- a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java +++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java @@ -31,6 +31,7 @@ import org.apache.htrace.Span; import org.apache.htrace.util.DataDir; import org.apache.htrace.util.HTracedProcess; import org.apache.htrace.util.TestUtil; +import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.http.HttpStatus; import org.junit.After; @@ -91,14 +92,18 @@ public class TestHTracedRESTReceiver { public void testBasicGet() throws Exception { HTracedRESTReceiver receiver = new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl)); + HttpClient http = receiver.createHttpClient(60000L, 60000L); + http.start(); try { // Do basic a GET /server/info against htraced - ContentResponse response = receiver.httpClient.GET(restServerUrl + "server/info"); + ContentResponse response = + http.GET(restServerUrl + "server/info"); assertEquals("application/json", response.getMediaType()); String content = processGET(response); assertTrue(content.contains("ReleaseVersion")); System.out.println(content); } finally { + http.stop(); receiver.close(); } } @@ -109,22 +114,25 @@ public class TestHTracedRESTReceiver { return response.getContentAsString(); } - /** - * Send 100 spans then confirm they made it in. - * @throws Exception - */ - @Test (timeout = 60000) - public void testSendingSpans() throws Exception { + private void testSendingSpansImpl(boolean testClose) throws Exception { final HTracedRESTReceiver receiver = new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl)); final int NUM_SPANS = 3; + final HttpClient http = receiver.createHttpClient(60000, 60000); + http.start(); try { for (int i = 0; i < NUM_SPANS; i++) { - Span span = new MilliSpan.Builder().parents(new long [] {1L}).spanId(i).build(); + Span span = new MilliSpan.Builder().parents( + new long [] {1L}).spanId(i).build(); LOG.info(span.toString()); receiver.receiveSpan(span); } - receiver.startFlushing(); + + if (testClose) { + receiver.close(); + } else { + receiver.startFlushing(); + } TestUtil.waitFor(new TestUtil.Supplier<Boolean>() { @Override public Boolean get() { @@ -134,7 +142,7 @@ public class TestHTracedRESTReceiver { // span id. String findSpan = String.format("span/%016x", i); ContentResponse response = - receiver.httpClient.GET(restServerUrl + findSpan); + http.GET(restServerUrl + findSpan); String content = processGET(response); if ((content == null) || (content.length() == 0)) { LOG.info("Failed to find span " + i); @@ -150,7 +158,30 @@ public class TestHTracedRESTReceiver { } }, 10, 20000); } finally { - receiver.close(); + http.stop(); + if (!testClose) { + receiver.close(); + } } } + + /** + * Send 100 spans then confirm they made it in. + * @throws Exception + */ + @Test (timeout = 60000) + public void testSendingSpans() throws Exception { + testSendingSpansImpl(false); + } + + /** + * Test that the REST receiver blocks during shutdown until all spans are sent + * (or a long timeout elapses). Otherwise, short-lived client processes will + * never have a chance to send all their spans and we will have incomplete + * information. + */ + @Test (timeout = 60000) + public void testShutdownBlocksUntilSpanAreSent() throws Exception { + testSendingSpansImpl(true); + } }
