Repository: incubator-htrace
Updated Branches:
  refs/heads/master 9877e8127 -> 5911712f3


HTRACE-133. HTracedRESTReceiver drops spans when close() is called (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/5911712f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/5911712f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/5911712f

Branch: refs/heads/master
Commit: 5911712f3aed972564a28e0cf8501eda036bd4f0
Parents: 9877e81
Author: Colin P. Mccabe <[email protected]>
Authored: Fri Mar 6 12:32:02 2015 -0800
Committer: Colin P. Mccabe <[email protected]>
Committed: Fri Mar 6 12:32:02 2015 -0800

----------------------------------------------------------------------
 .../apache/htrace/impl/HTracedRESTReceiver.java | 95 +++++++++++++-------
 .../htrace/impl/TestHTracedRESTReceiver.java    | 53 ++++++++---
 2 files changed, 107 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5911712f/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
----------------------------------------------------------------------
diff --git 
a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java 
b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
index ae1cfed..7edc2b8 100644
--- 
a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
+++ 
b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java
@@ -68,9 +68,10 @@ import org.eclipse.jetty.http.HttpStatus;
 public class HTracedRESTReceiver implements SpanReceiver {
   private static final Log LOG = LogFactory.getLog(HTracedRESTReceiver.class);
 
-  // TODO: Take process name and add this to user agent?  Would help debugging?
-  // @VisibleForTesting Protected so accessible from tests.
-  final HttpClient httpClient;
+  /**
+   * The HttpClient to use for this receiver.
+   */
+  private final HttpClient httpClient;
 
   /**
    * The maximum number of spans to buffer.
@@ -98,11 +99,16 @@ public class HTracedRESTReceiver implements SpanReceiver {
   private final Thread postSpansThread;
 
   /**
-   * Timeout in milliseconds.
-   * For now, it is read and connect timeout.
+   * The connection timeout in milliseconds.
+   */
+  public static final String CLIENT_CONNECT_TIMEOUT_MS_KEY = 
"client.connect.timeout.ms";
+  private static final int CLIENT_CONNECT_TIMEOUT_MS_DEFAULT = 30000;
+
+  /**
+   * The idle timeout in milliseconds.
    */
-  public static final String CLIENT_REST_TIMEOUT_MS_KEY = 
"client.rest.timeout.ms";
-  private static final int CLIENT_REST_TIMEOUT_MS_DEFAULT = 60000;
+  public static final String CLIENT_IDLE_TIMEOUT_MS_KEY = 
"client.idle.timeout.ms";
+  private static final int CLIENT_IDLE_TIMEOUT_MS_DEFAULT = 120000;
 
   /**
    * URL of the htraced REST server we are to talk to.
@@ -164,19 +170,32 @@ public class HTracedRESTReceiver implements SpanReceiver {
   private boolean mustStartFlush;
 
   /**
+   * Create an HttpClient instance.
+   *
+   * @param connTimeout         The timeout to use for connecting.
+   * @param idleTimeout         The idle timeout to use.
+   */
+  HttpClient createHttpClient(long connTimeout, long idleTimeout) {
+    HttpClient httpClient = new HttpClient();
+    httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT,
+      this.getClass().getSimpleName()));
+    httpClient.setConnectTimeout(connTimeout);
+    httpClient.setIdleTimeout(idleTimeout);
+    return httpClient;
+  }
+
+  /**
    * Constructor.
    * You must call {@link #close()} post construction when done.
    * @param conf
    * @throws Exception
    */
   public HTracedRESTReceiver(final HTraceConfiguration conf) throws Exception {
-    this.httpClient = new HttpClient();
-    this.httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT,
-      this.getClass().getSimpleName()));
-    // Use same timeout for connection and idle for now.
-    int timeout = conf.getInt(CLIENT_REST_TIMEOUT_MS_KEY, 
CLIENT_REST_TIMEOUT_MS_DEFAULT);
-    this.httpClient.setConnectTimeout(timeout);
-    this.httpClient.setIdleTimeout(timeout);
+    int connTimeout = conf.getInt(CLIENT_CONNECT_TIMEOUT_MS_KEY,
+                                  CLIENT_CONNECT_TIMEOUT_MS_DEFAULT);
+    int idleTimeout = conf.getInt(CLIENT_IDLE_TIMEOUT_MS_KEY,
+                                  CLIENT_IDLE_TIMEOUT_MS_DEFAULT);
+    this.httpClient = createHttpClient(connTimeout, idleTimeout);
     this.capacity = conf.getInt(CLIENT_REST_QUEUE_CAPACITY_KEY, 
CLIENT_REST_QUEUE_CAPACITY_DEFAULT);
     this.spans = new ArrayDeque<Span>(capacity);
     // Build up the writeSpans URL.
@@ -197,9 +216,10 @@ public class HTracedRESTReceiver implements SpanReceiver {
     this.postSpansThread.setName("PostSpans");
     this.postSpansThread.start();
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Created new HTracedRESTReceiver with timeout=" + timeout +
-            ", capacity=" + capacity + ", url=" + url +  ", periodInMs=" +
-            periodInMs + ", maxToSendAtATime=" + maxToSendAtATime);
+      LOG.debug("Created new HTracedRESTReceiver with connTimeout=" +
+            connTimeout + ", idleTimeout = " + idleTimeout + ", capacity=" +
+            capacity + ", url=" + url +  ", periodInMs=" + periodInMs +
+            ", maxToSendAtATime=" + maxToSendAtATime);
     }
   }
 
@@ -239,20 +259,24 @@ public class HTracedRESTReceiver implements SpanReceiver {
           lock.lock();
           try {
             if (shutdown) {
-              LOG.info("Shutting down PostSpans thread...");
-              break;
-            }
-            try {
-              waitNs = cond.awaitNanos(waitNs);
-              if (mustStartFlush) {
+              if (spans.isEmpty()) {
+                LOG.debug("Shutting down PostSpans thread...");
+                break;
+              }
+            } else {
+              try {
+                waitNs = cond.awaitNanos(waitNs);
+                if (mustStartFlush) {
+                  waitNs = 0;
+                  mustStartFlush = false;
+                }
+              } catch (InterruptedException e) {
+                LOG.info("Got InterruptedException");
                 waitNs = 0;
-                mustStartFlush = false;
               }
-            } catch (InterruptedException e) {
-              LOG.info("Got InterruptedException");
-              waitNs = 0;
             }
-            if ((spans.size() > maxToSendAtATime) || (waitNs <= 0)) {
+            if ((spans.size() > maxToSendAtATime) || (waitNs <= 0) ||
+                    shutdown) {
               loadSpanBuf();
               waitNs = periodInNs;
             }
@@ -319,7 +343,7 @@ public class HTracedRESTReceiver implements SpanReceiver {
 
   @Override
   public void close() throws IOException {
-    LOG.info("Closing HTracedRESTReceiver(" + url + ").");
+    LOG.debug("Closing HTracedRESTReceiver(" + url + ").");
     lock.lock();
     try {
       this.shutdown = true;
@@ -328,7 +352,13 @@ public class HTracedRESTReceiver implements SpanReceiver {
       lock.unlock();
     }
     try {
-      postSpansThread.join(30000);
+      postSpansThread.join(120000);
+      if (postSpansThread.isAlive()) {
+        LOG.error("Timed out without closing HTracedRESTReceiver(" +
+                  url + ").");
+      } else {
+        LOG.debug("Closed HTracedRESTReceiver(" + url + ").");
+      }
     } catch (InterruptedException e) {
       LOG.error("Interrupted while joining postSpans", e);
     }
@@ -364,6 +394,11 @@ public class HTracedRESTReceiver implements SpanReceiver {
     boolean added = false;
     lock.lock();
     try {
+      if (shutdown) {
+        LOG.trace("receiveSpan(span=" + span + "): HTracedRESTReceiver " +
+            "is already shut down.");
+        return;
+      }
       if (spans.size() < capacity) {
         spans.add(span);
         added = true;

http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5911712f/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
----------------------------------------------------------------------
diff --git 
a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
 
b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
index 676e348..eca6d6d 100644
--- 
a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
+++ 
b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java
@@ -31,6 +31,7 @@ import org.apache.htrace.Span;
 import org.apache.htrace.util.DataDir;
 import org.apache.htrace.util.HTracedProcess;
 import org.apache.htrace.util.TestUtil;
+import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.client.api.ContentResponse;
 import org.eclipse.jetty.http.HttpStatus;
 import org.junit.After;
@@ -91,14 +92,18 @@ public class TestHTracedRESTReceiver {
   public void testBasicGet() throws Exception {
     HTracedRESTReceiver receiver =
       new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl));
+    HttpClient http = receiver.createHttpClient(60000L, 60000L);
+    http.start();
     try {
       // Do basic a GET /server/info against htraced
-      ContentResponse response = receiver.httpClient.GET(restServerUrl + 
"server/info");
+      ContentResponse response =
+        http.GET(restServerUrl + "server/info");
       assertEquals("application/json", response.getMediaType());
       String content = processGET(response);
       assertTrue(content.contains("ReleaseVersion"));
       System.out.println(content);
     } finally {
+      http.stop();
       receiver.close();
     }
   }
@@ -109,22 +114,25 @@ public class TestHTracedRESTReceiver {
     return response.getContentAsString();
   }
 
-  /**
-   * Send 100 spans then confirm they made it in.
-   * @throws Exception
-   */
-  @Test (timeout = 60000)
-  public void testSendingSpans() throws Exception {
+  private void testSendingSpansImpl(boolean testClose) throws Exception {
     final HTracedRESTReceiver receiver =
       new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl));
     final int NUM_SPANS = 3;
+    final HttpClient http = receiver.createHttpClient(60000, 60000);
+    http.start();
     try {
       for (int i = 0; i < NUM_SPANS; i++) {
-        Span span = new MilliSpan.Builder().parents(new long [] 
{1L}).spanId(i).build();
+        Span span = new MilliSpan.Builder().parents(
+            new long [] {1L}).spanId(i).build();
         LOG.info(span.toString());
         receiver.receiveSpan(span);
       }
-      receiver.startFlushing();
+
+      if (testClose) {
+        receiver.close();
+      } else {
+        receiver.startFlushing();
+      }
       TestUtil.waitFor(new TestUtil.Supplier<Boolean>() {
         @Override
         public Boolean get() {
@@ -134,7 +142,7 @@ public class TestHTracedRESTReceiver {
               // span id.
               String findSpan = String.format("span/%016x", i);
               ContentResponse response =
-                  receiver.httpClient.GET(restServerUrl + findSpan);
+                  http.GET(restServerUrl + findSpan);
               String content = processGET(response);
               if ((content == null) || (content.length() == 0)) {
                 LOG.info("Failed to find span " + i);
@@ -150,7 +158,30 @@ public class TestHTracedRESTReceiver {
         }
       }, 10, 20000);
     } finally {
-      receiver.close();
+      http.stop();
+      if (!testClose) {
+        receiver.close();
+      }
     }
   }
+
+  /**
+   * Send 100 spans then confirm they made it in.
+   * @throws Exception
+   */
+  @Test (timeout = 60000)
+  public void testSendingSpans() throws Exception {
+    testSendingSpansImpl(false);
+  }
+
+  /**
+   * Test that the REST receiver blocks during shutdown until all spans are 
sent
+   * (or a long timeout elapses).  Otherwise, short-lived client processes will
+   * never have a chance to send all their spans and we will have incomplete
+   * information.
+   */
+  @Test (timeout = 60000)
+  public void testShutdownBlocksUntilSpanAreSent() throws Exception {
+    testSendingSpansImpl(true);
+  }
 }

Reply via email to