Repository: incubator-htrace Updated Branches: refs/heads/master 56dd48b5a -> 1eaf7bd09
HTRACE-109. fix TestHTracedRESTReceiver unit test failures (cmccabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/1eaf7bd0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/1eaf7bd0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/1eaf7bd0 Branch: refs/heads/master Commit: 1eaf7bd09cbc0fa456fe3191c193850023969146 Parents: 56dd48b Author: Colin P. Mccabe <[email protected]> Authored: Wed Feb 18 14:30:48 2015 -0800 Committer: Colin P. Mccabe <[email protected]> Committed: Wed Feb 18 14:39:53 2015 -0800 ---------------------------------------------------------------------- .../src/org/apache/htrace/conf/config_keys.go | 5 + .../go/src/org/apache/htrace/htraced/htraced.go | 46 ++- .../go/src/org/apache/htrace/htraced/rest.go | 15 + .../java/org/apache/htrace/impl/MilliSpan.java | 3 +- .../java/org/apache/htrace/util/TestUtil.java | 91 ++++++ .../apache/htrace/impl/HTracedRESTReceiver.java | 286 ++++++++++++++----- .../htrace/impl/TestHTracedRESTReceiver.java | 64 +++-- .../org/apache/htrace/util/HTracedProcess.java | 109 +++++-- .../apache/htrace/util/TestHTracedProcess.java | 29 +- 9 files changed, 516 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/1eaf7bd0/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go b/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go index b22e312..ba63f2d 100644 --- a/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go +++ b/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go @@ -62,6 +62,11 @@ const HTRACE_LOG_PATH = "log.path" // The log level to use for the logs in htrace. const HTRACE_LOG_LEVEL = "log.level" +// A host:port pair to send information to on startup. This is used in unit +// tests to determine the (random) port of the htraced process that has been +// started. +const HTRACE_STARTUP_NOTIFICATION_ADDRESS = "startup.notification.address" + // Default values for HTrace configuration keys. var DEFAULTS = map[string]string{ HTRACE_WEB_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_WEB_ADDRESS_DEFAULT_PORT), http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/1eaf7bd0/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go b/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go index d2cbafc..191b68e 100644 --- a/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go @@ -20,7 +20,9 @@ package main import ( + "encoding/json" "fmt" + "net" "org/apache/htrace/common" "org/apache/htrace/conf" "os" @@ -69,12 +71,54 @@ func main() { lg.Errorf("Error creating datastore: %s\n", err.Error()) os.Exit(1) } - _, err = CreateRestServer(cnf, store) + var rsv *RestServer + rsv, err = CreateRestServer(cnf, store) if err != nil { lg.Errorf("Error creating REST server: %s\n", err.Error()) os.Exit(1) } + naddr := cnf.Get(conf.HTRACE_STARTUP_NOTIFICATION_ADDRESS) + if naddr != "" { + notif := StartupNotification{ + HttpAddr: rsv.Addr().String(), + ProcessId: os.Getpid(), + } + err = sendStartupNotification(naddr, ¬if) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to send startup notification: "+ + "%s\n", err.Error()) + os.Exit(1) + } + } for { time.Sleep(time.Duration(10) * time.Hour) } } + +// A startup notification message that we optionally send on startup. +// Used by unit tests. +type StartupNotification struct { + HttpAddr string + ProcessId int +} + +func sendStartupNotification(naddr string, notif *StartupNotification) error { + conn, err := net.Dial("tcp", naddr) + if err != nil { + return err + } + defer func() { + if conn != nil { + conn.Close() + } + }() + var buf []byte + buf, err = json.Marshal(notif) + if err != nil { + return err + } + _, err = conn.Write(buf) + conn.Close() + conn = nil + return nil +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/1eaf7bd0/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go index 9cdab20..495aed0 100644 --- a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go @@ -112,6 +112,7 @@ func (hand *findSidHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) if !ok { return } + hand.lg.Debugf("findSidHandler(sid=%s)\n", common.SpanId(sid)) span := hand.store.FindSpan(sid) if span == nil { writeError(hand.lg, w, http.StatusNoContent, fmt.Sprintf("No such span as %s\n", @@ -139,6 +140,7 @@ func (hand *findChildrenHandler) ServeHTTP(w http.ResponseWriter, req *http.Requ if !ok { return } + hand.lg.Debugf("findChildrenHandler(sid=%s, lim=%d)\n", common.SpanId(sid), lim) children := hand.store.FindChildren(sid, lim) jbytes, err := json.Marshal(children) if err != nil { @@ -170,6 +172,7 @@ func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques } spans = append(spans, &span) } + hand.lg.Debugf("writeSpansHandler: received %d span(s).\n", len(spans)) for spanIdx := range spans { hand.lg.Debugf("writing span %s\n", spans[spanIdx].ToJson()) hand.store.WriteSpan(spans[spanIdx]) @@ -238,6 +241,15 @@ func (hand *defaultServeHandler) ServeHTTP(w http.ResponseWriter, req *http.Requ w.Write([]byte(rsc)) } +type logErrorHandler struct { + lg *common.Logger +} + +func (hand *logErrorHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + hand.lg.Errorf("Got unknown request %s\n", req.RequestURI) + writeError(hand.lg, w, http.StatusBadRequest, "Unknown request.") +} + type RestServer struct { listener net.Listener lg *common.Logger @@ -280,6 +292,9 @@ func CreateRestServer(cnf *conf.Config, store *dataStore) (*RestServer, error) { // Default Handler. This will serve requests for static requests. r.PathPrefix("/").Handler(&defaultServeHandler{lg: rsv.lg}).Methods("GET") + // Log an error message for unknown non-GET requests. + r.PathPrefix("/").Handler(&logErrorHandler{lg: rsv.lg}) + go http.Serve(rsv.listener, r) rsv.lg.Infof("Started REST server on %s...\n", rsv.listener.Addr().String()) http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/1eaf7bd0/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java index be5521a..4467208 100644 --- a/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java +++ b/htrace-core/src/main/java/org/apache/htrace/impl/MilliSpan.java @@ -50,6 +50,7 @@ public class MilliSpan implements Span { private static Random rand = new Random(); private static ObjectWriter JSON_WRITER = new ObjectMapper().writer(); + private static final long EMPTY_PARENT_ARRAY[] = new long[0]; private long begin; private long end; @@ -74,7 +75,7 @@ public class MilliSpan implements Span { private long end; private String description; private long traceId; - private long parents[]; + private long parents[] = EMPTY_PARENT_ARRAY; private long spanId; private Map<String, String> traceInfo = null; private String processId; http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/1eaf7bd0/htrace-core/src/test/java/org/apache/htrace/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/test/java/org/apache/htrace/util/TestUtil.java b/htrace-core/src/test/java/org/apache/htrace/util/TestUtil.java new file mode 100644 index 0000000..7cb4aed --- /dev/null +++ b/htrace-core/src/test/java/org/apache/htrace/util/TestUtil.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.util; + +import java.io.File; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Utilities for writing unit tests. + */ +public class TestUtil { + /** + * Get a dump of the stack traces of all threads. + */ + public static String threadDump() { + StringBuilder dump = new StringBuilder(); + Map<Thread, StackTraceElement[]> stackTraces = Thread.getAllStackTraces(); + for (Map.Entry<Thread, StackTraceElement[]> e : stackTraces.entrySet()) { + Thread thread = e.getKey(); + dump.append(String.format( + "\"%s\" %s prio=%d tid=%d %s\njava.lang.Thread.State: %s", + thread.getName(), + (thread.isDaemon() ? "daemon" : ""), + thread.getPriority(), + thread.getId(), + Thread.State.WAITING.equals(thread.getState()) ? + "in Object.wait()" : thread.getState().name().toLowerCase(), + Thread.State.WAITING.equals(thread.getState()) ? + "WAITING (on object monitor)" : thread.getState())); + for (StackTraceElement stackTraceElement : e.getValue()) { + dump.append("\n at "); + dump.append(stackTraceElement); + } + dump.append("\n"); + } + return dump.toString(); + } + + /** + * A callback which returns a value of type T. + * + * TODO: remove this when we're on Java 8, in favor of + * java.util.function.Supplier. + */ + public interface Supplier<T> { + T get(); + } + + /** + * Wait for a condition to become true for a configurable amount of time. + * + * @param check The condition to wait for. + * @param periodMs How often to check the condition, in milliseconds. + * @param timeoutMs How long to wait in total, in milliseconds. + */ + public static void waitFor(Supplier<Boolean> check, + long periodMs, long timeoutMs) + throws TimeoutException, InterruptedException + { + long endNs = System.nanoTime() + + TimeUnit.NANOSECONDS.convert(timeoutMs, TimeUnit.MILLISECONDS); + while (true) { + boolean result = check.get(); + if (result) { + return; + } + long nowNs = System.nanoTime(); + if (nowNs >= endNs) { + throw new TimeoutException("Timed out waiting for test condition. " + + "Thread dump:\n" + threadDump()); + } + Thread.sleep(periodMs); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/1eaf7bd0/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java index 35cd332..d730a17 100644 --- a/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/HTracedRESTReceiver.java @@ -20,14 +20,13 @@ package org.apache.htrace.impl; import java.io.IOException; import java.net.URL; -import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.ArrayDeque; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -74,19 +73,29 @@ public class HTracedRESTReceiver implements SpanReceiver { final HttpClient httpClient; /** + * The maximum number of spans to buffer. + */ + private final int capacity; + + /** * REST URL to use writing Spans. */ - private final String writeSpansRESTURL; + private final String url; + + /** + * The maximum number of spans to send in a single PUT. + */ + private final int maxToSendAtATime; /** * Runs background task to do the REST PUT. */ - private final ScheduledExecutorService scheduler; + private final PostSpans postSpans; /** - * Keep around reference so can cancel on close any running scheduled task. + * Thread for postSpans */ - private final ScheduledFuture<?> scheduledFuture; + private final Thread postSpansThread; /** * Timeout in milliseconds. @@ -111,8 +120,8 @@ public class HTracedRESTReceiver implements SpanReceiver { /** * Period at which the background thread that does the REST POST to htraced in ms. */ - public static final String CLIENT_REST_PERIOD_MS_KEY = "client.reset.period.ms"; - private static final int CLIENT_REST_PERIOD_MS_DEFAULT = 1000; + public static final String CLIENT_REST_PERIOD_MS_KEY = "client.rest.period.ms"; + private static final int CLIENT_REST_PERIOD_MS_DEFAULT = 30000; /** * Maximum spans to post to htraced at a time. @@ -122,15 +131,37 @@ public class HTracedRESTReceiver implements SpanReceiver { private static final int CLIENT_REST_MAX_SPANS_AT_A_TIME_DEFAULT = 100; /** + * Lock protecting the PostSpans data. + */ + private ReentrantLock lock = new ReentrantLock(); + + /** + * Condition variable used to wake up the PostSpans thread. + */ + private Condition cond = lock.newCondition(); + + /** + * True if we should shut down. + * Protected by the lock. + */ + private boolean shutdown = false; + + /** * Simple bounded queue to hold spans between periodic runs of the httpclient. + * Protected by the lock. */ - private final Queue<Span> queue; + private final ArrayDeque<Span> spans; /** * Keep last time we logged we were at capacity; used to prevent flooding of logs with * "at capacity" messages. */ - private volatile long lastAtCapacityWarningLog = 0L; + private AtomicLong lastAtCapacityWarningLog = new AtomicLong(0L); + + /** + * True if we should flush as soon as possible. Protected by the lock. + */ + private boolean mustStartFlush; /** * Constructor. @@ -146,25 +177,25 @@ public class HTracedRESTReceiver implements SpanReceiver { int timeout = conf.getInt(CLIENT_REST_TIMEOUT_MS_KEY, CLIENT_REST_TIMEOUT_MS_DEFAULT); this.httpClient.setConnectTimeout(timeout); this.httpClient.setIdleTimeout(timeout); - int capacity = conf.getInt(CLIENT_REST_QUEUE_CAPACITY_KEY, CLIENT_REST_QUEUE_CAPACITY_DEFAULT); - this.queue = new ArrayBlockingQueue<Span>(capacity, true); + this.capacity = conf.getInt(CLIENT_REST_QUEUE_CAPACITY_KEY, CLIENT_REST_QUEUE_CAPACITY_DEFAULT); + this.spans = new ArrayDeque<Span>(capacity); // Build up the writeSpans URL. URL restServer = new URL(conf.get(HTRACED_REST_URL_KEY, HTRACED_REST_URL_DEFAULT)); - URL url = - new URL(restServer.getProtocol(), restServer.getHost(), restServer.getPort(), "/writeSpans"); - this.writeSpansRESTURL = url.toString(); - // Make a scheduler with one thread to run our POST of spans on a period. - this.scheduler = Executors.newScheduledThreadPool(1); + URL url = new URL(restServer.getProtocol(), restServer.getHost(), restServer.getPort(), "/writeSpans"); + this.url = url.toString(); // Period at which we run the background thread that does the REST POST to htraced. int periodInMs = conf.getInt(CLIENT_REST_PERIOD_MS_KEY, CLIENT_REST_PERIOD_MS_DEFAULT); // Maximum spans to send in one go - int maxToSendAtATime = + this.maxToSendAtATime = conf.getInt(CLIENT_REST_MAX_SPANS_AT_A_TIME_KEY, CLIENT_REST_MAX_SPANS_AT_A_TIME_DEFAULT); - this.scheduledFuture = - this.scheduler.scheduleAtFixedRate(new PostSpans(this.queue, maxToSendAtATime), - periodInMs, periodInMs, TimeUnit.MILLISECONDS); // Start up the httpclient. this.httpClient.start(); + // Start the background thread. + this.postSpans = new PostSpans(periodInMs); + this.postSpansThread = new Thread(postSpans); + this.postSpansThread.setDaemon(true); + this.postSpansThread.setName("PostSpans"); + this.postSpansThread.start(); } /** @@ -172,81 +203,188 @@ public class HTracedRESTReceiver implements SpanReceiver { * Run on a period. Services the passed in queue taking spans and sending them to traced via http. */ private class PostSpans implements Runnable { - private final Queue<Span> q; - private final int maxToSendAtATime; + private final long periodInNs; + private final ArrayDeque<Span> spanBuf; - private PostSpans(final Queue<Span> q, final int maxToSendAtATime) { - this.q = q; - this.maxToSendAtATime = maxToSendAtATime; + private PostSpans(long periodInMs) { + this.periodInNs = TimeUnit.NANOSECONDS. + convert(periodInMs, TimeUnit.MILLISECONDS); + this.spanBuf = new ArrayDeque<Span>(maxToSendAtATime); } + /** + * The span sending thread. + * + * We send a batch of spans for one of two reasons: there are already + * maxToSendAtATime spans in the buffer, or the client.rest.period.ms + * has elapsed. The idea is that we want to strike a balance between + * sending a lot of spans at a time, for efficiency purposes, and + * making sure that we don't buffer spans locally for too long. + * + * The longer we buffer spans locally, the longer we will have to wait + * to see the results of our client operations in the GUI, and the higher + * the risk of losing them if the client crashes. + */ @Override public void run() { - Span span = null; - // Cycle until we drain queue. Send maxToSendAtATime if more than this in queue. - while ((span = this.q.poll()) != null) { - // We got a span. Send at least this one span. - Request request = httpClient.newRequest(writeSpansRESTURL).method(HttpMethod.POST); + long waitNs; + try { + waitNs = periodInNs; + while (true) { + lock.lock(); + try { + if (shutdown) { + LOG.info("Shutting down PostSpans thread..."); + break; + } + try { + waitNs = cond.awaitNanos(waitNs); + if (mustStartFlush) { + waitNs = 0; + mustStartFlush = false; + } + } catch (InterruptedException e) { + LOG.info("Got InterruptedException"); + waitNs = 0; + } + if ((spans.size() > maxToSendAtATime) || (waitNs <= 0)) { + loadSpanBuf(); + waitNs = periodInNs; + } + } finally { + lock.unlock(); + } + // Once the lock has been safely released, we can do some network + // I/O without blocking the client process. + if (!spanBuf.isEmpty()) { + sendSpans(); + spanBuf.clear(); + } + } + } finally { + if (httpClient != null) { + try { + httpClient.stop(); + } catch (Exception e) { + LOG.error("Error shutting down httpClient", e); + } + } + spans.clear(); + } + } + + private void loadSpanBuf() { + for (int loaded = 0; loaded < maxToSendAtATime; loaded++) { + Span span = spans.pollFirst(); + if (span == null) { + return; + } + spanBuf.add(span); + } + } + + private void sendSpans() { + try { + Request request = httpClient.newRequest(url).method(HttpMethod.POST); request.header(HttpHeader.CONTENT_TYPE, "application/json"); - int count = 1; - request.content(new StringContentProvider(span.toJson())); - // Drain queue or until we have maxToSendAtATime spans, if more than just one. - while ((span = this.q.poll()) != null) { - request.content(new StringContentProvider(span.toJson())); - count++; - // If we've accumulated sufficient to send, go ahead and send what we have. Can do the - // rest in out next go around. - if (count > this.maxToSendAtATime) break; + StringBuilder bld = new StringBuilder(); + for (Span span : spanBuf) { + bld.append(span.toJson()); } - try { - ContentResponse response = request.send(); - if (response.getStatus() == HttpStatus.OK_200) { - if (LOG.isDebugEnabled()) LOG.debug("POSTED " + count + " spans"); - } else { - LOG.error("Status: " + response.getStatus()); - LOG.error(response.getHeaders()); - LOG.error(response.getContentAsString()); + request.content(new StringContentProvider(bld.toString())); + ContentResponse response = request.send(); + if (response.getStatus() == HttpStatus.OK_200) { + if (LOG.isDebugEnabled()) { + LOG.debug("POSTED " + spanBuf.size() + " spans"); } - } catch (InterruptedException e) { - LOG.error(e); - } catch (TimeoutException e) { - LOG.error(e); - } catch (ExecutionException e) { - LOG.error(e); + } else { + LOG.error("Status: " + response.getStatus()); + LOG.error(response.getHeaders()); + LOG.error(response.getContentAsString()); } + } catch (InterruptedException e) { + LOG.error(e); + } catch (TimeoutException e) { + LOG.error(e); + } catch (ExecutionException e) { + LOG.error(e); } } } @Override public void close() throws IOException { - if (this.scheduledFuture != null) this.scheduledFuture.cancel(true); - if (this.scheduler == null) this.scheduler.shutdown(); - if (this.httpClient != null) { - try { - this.httpClient.stop(); - } catch (Exception e) { - throw new IOException(e); - } + LOG.info("Closing HTracedRESTReceiver."); + lock.lock(); + try { + this.shutdown = true; + cond.signal(); + } finally { + lock.unlock(); + } + try { + postSpansThread.join(30000); + } catch (InterruptedException e) { + LOG.error("Interrupted while joining postSpans", e); } } - // @VisibleForTesting - boolean isQueueEmpty() { - return this.queue.isEmpty(); + /** + * Start flushing the buffered spans. + * + * Note that even after calling this function, you will still have to wait + * for the flush to finish happening. This function just starts the flush; + * it does not block until it has completed. You also do not get + * "read-after-write consistency" with htraced... the spans that are + * written may be buffered for a short period of time prior to being + * readable. This is not a problem for production use (since htraced is not + * a database), but it means that most unit tests will need a loop in their + * "can I read what I wrote" tests. + */ + void startFlushing() { + LOG.info("Triggering HTracedRESTReceiver flush."); + lock.lock(); + try { + mustStartFlush = true; + cond.signal(); + } finally { + lock.unlock(); + } } private static long WARN_TIMEOUT_MS = 300000; @Override public void receiveSpan(Span span) { - if (!this.queue.offer(span)) { - // TODO: If failed the offer, run the background thread now. I can't block though? + boolean added = false; + lock.lock(); + try { + if (spans.size() < capacity) { + spans.add(span); + added = true; + if (spans.size() >= maxToSendAtATime) { + cond.signal(); + } + } else { + cond.signal(); + } + } finally { + lock.unlock(); + } + if (!added) { long now = System.nanoTime() / 1000000L; - // Only log every 5 minutes. Any more than this for a guest process is obnoxious - if (now - lastAtCapacityWarningLog > WARN_TIMEOUT_MS) { - LOG.warn("At capacity"); - this.lastAtCapacityWarningLog = now; + long last = lastAtCapacityWarningLog.get(); + if (now - last > WARN_TIMEOUT_MS) { + // Only log every 5 minutes. Any more than this for a guest process + // is obnoxious. + if (lastAtCapacityWarningLog.compareAndSet(last, now)) { + // If the atomic-compare-and-set succeeds, we should log. Otherwise, + // we should assume another thread already logged and bumped up the + // value of lastAtCapacityWarning sometime between our get and the + // "if" statement. + LOG.warn("There are too many HTrace spans to buffer! We have " + + "already buffered " + capacity + " spans. Dropping spans."); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/1eaf7bd0/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java index fe9f1c0..b1f1b11 100644 --- a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java +++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java @@ -18,6 +18,7 @@ package org.apache.htrace.impl; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.File; @@ -29,6 +30,7 @@ import org.apache.htrace.HTraceConfiguration; import org.apache.htrace.Span; import org.apache.htrace.util.DataDir; import org.apache.htrace.util.HTracedProcess; +import org.apache.htrace.util.TestUtil; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.http.HttpStatus; import org.junit.After; @@ -36,20 +38,20 @@ import org.junit.Before; import org.junit.Test; public class TestHTracedRESTReceiver { - private static final Log LOG = LogFactory.getLog(TestHTracedRESTReceiver.class); - private URL restServerUrl;; + private static final Log LOG = + LogFactory.getLog(TestHTracedRESTReceiver.class); + private URL restServerUrl; private DataDir dataDir; HTracedProcess htraced; @Before public void setUp() throws Exception { this.dataDir = new DataDir(); - // Start on 9097. Would be better to start at port 0 and then ask server what port it managed - // to come up on. - this.restServerUrl = new URL("http://localhost:9097/"); File tlDir = DataDir.getTopLevelOfCheckout(this.dataDir.getDataDir()); File pathToHTracedBinary = HTracedProcess.getPathToHTraceBinaryFromTopLevel(tlDir); - this.htraced = new HTracedProcess(pathToHTracedBinary, dataDir.getDataDir(), restServerUrl); + this.htraced = new HTracedProcess(pathToHTracedBinary, + dataDir.getDataDir(), "localhost"); + this.restServerUrl = new URL("http://" + htraced.getHttpAddr() + "/"); } @After @@ -75,6 +77,7 @@ public class TestHTracedRESTReceiver { @Override public String get(String key, String defaultValue) { if (key.equals(HTracedRESTReceiver.HTRACED_REST_URL_KEY)) { + LOG.info("WATERMELON2: got request for htraced.rest.url. Returning " + this.restServerUrl.toString()); return this.restServerUrl.toString(); } return defaultValue; @@ -90,7 +93,7 @@ public class TestHTracedRESTReceiver { HTracedRESTReceiver receiver = new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl)); try { - // Do basic a GET /server/info against localhost:9095 htraced + // Do basic a GET /server/info against htraced ContentResponse response = receiver.httpClient.GET(restServerUrl + "server/info"); assertEquals("application/json", response.getMediaType()); String content = processGET(response); @@ -111,31 +114,44 @@ public class TestHTracedRESTReceiver { * Send 100 spans then confirm they made it in. * @throws Exception */ - @Test (timeout = 10000) + @Test (timeout = 60000) public void testSendingSpans() throws Exception { - HTracedRESTReceiver receiver = + final HTracedRESTReceiver receiver = new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl)); + final int NUM_SPANS = 3; try { - // TODO: Fix MilliSpan. Requires a parentid. Shouldn't have to have one else be explicit it - // is required. - for (int i = 0; i < 100; i++) { + for (int i = 0; i < NUM_SPANS; i++) { Span span = new MilliSpan.Builder().parents(new long [] {1L}).spanId(i).build(); LOG.info(span.toString()); receiver.receiveSpan(span); } - // Wait for the queue to empty before we go to check they made it over. - while (receiver.isQueueEmpty()) Thread.sleep(1); - // Read them all back. - for (int i = 0; i < 100; i++) { - // This is what the REST server expends when querying for a span id. - String findSpan = String.format("span/%016x", i); - ContentResponse response = receiver.httpClient.GET(restServerUrl + findSpan); - String content = processGET(response); - assertTrue(content != null && content.length() > 0); - LOG.info(content); - } + receiver.startFlushing(); + TestUtil.waitFor(new TestUtil.Supplier<Boolean>() { + @Override + public Boolean get() { + try { + for (int i = 0; i < NUM_SPANS; i++) { + // This is what the REST server expects when querying for a + // span id. + String findSpan = String.format("span/%016x", i); + ContentResponse response = + receiver.httpClient.GET(restServerUrl + findSpan); + String content = processGET(response); + if ((content == null) || (content.length() == 0)) { + LOG.info("Failed to find span " + i); + return false; + } + LOG.info("Got " + content + " for span " + i); + } + return true; + } catch (Throwable t) { + LOG.error("Got exception", t); + return false; + } + } + }, 10, 20000); } finally { receiver.close(); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/1eaf7bd0/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java b/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java index 12343f7..e319925 100644 --- a/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java +++ b/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java @@ -16,12 +16,19 @@ */ package org.apache.htrace.util; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; + import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.lang.ProcessBuilder.Redirect; +import java.net.ServerSocket; +import java.net.Socket; import java.net.URL; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * To get instance of HTraced up and running, create an instance of this class. @@ -29,30 +36,86 @@ import java.net.URL; * host data (leveldbs and logs). * TODO: We expect to find the htraced in a very particular place. Fragile. Will break if stuff * moves. - * TODO: What if a port clash? How to have it come up another port then ask the process what port - * it is running on? */ public class HTracedProcess extends Process { + private static final Log LOG = LogFactory.getLog(HTracedProcess.class); private final Process delegate; - public HTracedProcess(final File pathToHTracedBinary, final File dataDir, final URL url) - throws IOException { - // web.address for htraced is hostname ':' port; no 'scheme' yet. - String webAddress = url.getHost() + ":" + url.getPort(); - // Pass cmdline args to htraced to it uses our test dir for data. - ProcessBuilder pb = new ProcessBuilder(pathToHTracedBinary.toString(), - " -Dlog.level=TRACE", - "-Dweb.address=" + webAddress, - "-Ddata.store.clear=true", - "-Ddata.store.directories=" + dataDir.toString()); - pb.redirectErrorStream(true); - // Inherit STDERR/STDOUT i/o; dumps on console for now. Can add logs later. - pb.inheritIO(); - pb.directory(dataDir); - this.delegate = pb.start(); - assert pb.redirectInput() == Redirect.PIPE; - assert pb.redirectOutput().file() == dataDir; - assert this.delegate.getInputStream().read() == -1; + private final String httpAddr; + + /** + * Data send back from the HTraced process on the notification port. + */ + public static class StartupNotificationData { + /** + * The hostname:port pair which the HTraced process uses for HTTP requests. + */ + @JsonProperty("HttpAddr") + String httpAddr; + + /** + * The process ID of the HTraced process. + */ + @JsonProperty("ProcessId") + long processId; + } + + public HTracedProcess(final File binPath, final File dataDir, + final String host) throws IOException { + // Create a notifier socket bound to a random port. + ServerSocket listener = new ServerSocket(0); + boolean success = false; + Process process = null; + try { + // Use a random port for the web address. No 'scheme' yet. + String webAddress = host + ":0"; + String logPath = new File(dataDir, "log.txt").getAbsolutePath(); + // Pass cmdline args to htraced to it uses our test dir for data. + ProcessBuilder pb = new ProcessBuilder(binPath.toString(), + "-Dlog.level=TRACE", + "-Dlog.path=" + logPath, + "-Dweb.address=" + webAddress, + "-Ddata.store.clear=true", + "-Dstartup.notification.address=localhost:" + listener.getLocalPort(), + "-Ddata.store.directories=" + dataDir.toString()); + pb.redirectErrorStream(true); + // Inherit STDERR/STDOUT i/o; dumps on console for now. Can add logs later. + pb.inheritIO(); + pb.directory(dataDir); + //assert pb.redirectInput() == Redirect.PIPE; + //assert pb.redirectOutput().file() == dataDir; + process = pb.start(); + assert process.getInputStream().read() == -1; + StartupNotificationData data = readStartupNotification(listener); + httpAddr = data.httpAddr; + LOG.info("Started htraced process " + data.processId + " with http " + + "address " + data.httpAddr + ", logging to " + logPath); + success = true; + } finally { + if (!success) { + // Clean up after failure + if (process != null) { + process.destroy(); + process = null; + } + } + delegate = process; + listener.close(); + } + } + + private static StartupNotificationData + readStartupNotification(ServerSocket listener) throws IOException { + Socket socket = listener.accept(); + try { + InputStream in = socket.getInputStream(); + ObjectMapper objectMapper = new ObjectMapper(); + StartupNotificationData data = objectMapper. + readValue(in, StartupNotificationData.class); + return data; + } finally { + socket.close(); + } } public int hashCode() { @@ -91,6 +154,10 @@ public class HTracedProcess extends Process { return delegate.toString(); } + public String getHttpAddr() { + return httpAddr; + } + /** * Ugly but how else to do file-math? * @param topLevel Presumes top-level of the htrace checkout. @@ -100,4 +167,4 @@ public class HTracedProcess extends Process { return new File(new File(new File(new File(new File(topLevel, "htrace-core"), "src"), "go"), "build"), "htraced"); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/1eaf7bd0/htrace-htraced/src/test/java/org/apache/htrace/util/TestHTracedProcess.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/test/java/org/apache/htrace/util/TestHTracedProcess.java b/htrace-htraced/src/test/java/org/apache/htrace/util/TestHTracedProcess.java index 38f90e5..67e3a21 100644 --- a/htrace-htraced/src/test/java/org/apache/htrace/util/TestHTracedProcess.java +++ b/htrace-htraced/src/test/java/org/apache/htrace/util/TestHTracedProcess.java @@ -25,6 +25,8 @@ import java.io.InputStreamReader; import java.net.URL; import java.net.URLConnection; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.junit.Before; import org.junit.Test; @@ -34,6 +36,8 @@ import org.junit.Test; * in methods in the below. */ public class TestHTracedProcess { + private static final Log LOG = + LogFactory.getLog(TestHTracedProcess.class); private DataDir testDir = null; private final int TIMEOUT = 10000; @@ -51,7 +55,8 @@ public class TestHTracedProcess { connection.setReadTimeout(TIMEOUT); connection.connect(); StringBuffer sb = new StringBuffer(); - BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream())); + BufferedReader reader = new BufferedReader( + new InputStreamReader(connection.getInputStream())); try { String line = null; while ((line = reader.readLine()) != null) { @@ -70,24 +75,26 @@ public class TestHTracedProcess { * @throws InterruptedException */ @Test (timeout=10000) - public void testStartStopHTraced() throws IOException, InterruptedException { - // TODO: Make the test port random so no classes if concurrent test runs. Anything better - // I can do here? Pass a zero and have the daemon tell me where it is successfully listening? - String restURL = "http://localhost:9096/"; - URL restServerURL = new URL(restURL); + public void testStartStopHTraced() throws Exception { HTracedProcess htraced = null; File dataDir = this.testDir.getDataDir(); File topLevel = DataDir.getTopLevelOfCheckout(dataDir); try { - htraced = new HTracedProcess(HTracedProcess.getPathToHTraceBinaryFromTopLevel(topLevel), - dataDir, restServerURL); - String str = doGet(new URL(restServerURL + "server/info")); + htraced = new HTracedProcess(HTracedProcess. + getPathToHTraceBinaryFromTopLevel(topLevel), + dataDir, "localhost"); + LOG.info("Started HTracedProcess with REST server URL " + + htraced.getHttpAddr()); + String str = doGet(new URL( + "http://" + htraced.getHttpAddr() + "/server/info")); // Assert we go something back. assertTrue(str.contains("ReleaseVersion")); // Assert that the datadir is not empty. } finally { - if (htraced != null) htraced.destroy(); - System.out.println("ExitValue=" + htraced.exitValue()); + if (htraced != null) { + htraced.destroy(); + System.out.println("ExitValue=" + htraced.waitFor()); + } } } } \ No newline at end of file
