http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBufferManager.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBufferManager.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBufferManager.java new file mode 100644 index 0000000..8b59a72 --- /dev/null +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/PackedBufferManager.java @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.CharBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.nio.channels.spi.SelectorProvider; +import java.nio.charset.StandardCharsets; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.htrace.core.Span; + +class PackedBufferManager implements BufferManager { + private static final Log LOG = LogFactory.getLog(PackedBuffer.class); + private static final int MAX_PREQUEL_LENGTH = 2048; + private static final int METHOD_ID_WRITE_SPANS = 0x1; + private final Conf conf; + private final ByteBuffer frameBuffer; + private final PackedBuffer prequel; + private final PackedBuffer spans; + private final Selector selector; + private int numSpans; + + PackedBufferManager(Conf conf) throws IOException { + this.conf = conf; + this.frameBuffer = ByteBuffer.allocate(PackedBuffer.HRPC_REQ_FRAME_LENGTH); + this.prequel = new PackedBuffer(ByteBuffer.allocate(MAX_PREQUEL_LENGTH)); + this.spans = new PackedBuffer(ByteBuffer.allocate(conf.bufferSize)); + this.selector = SelectorProvider.provider().openSelector(); + clear(); + } + + @Override + public void writeSpan(Span span) throws IOException { + spans.writeSpan(span); + numSpans++; + if (LOG.isTraceEnabled()) { + LOG.trace("wrote " + span.toJson() + " to PackedBuffer for " + + conf.endpointStr + ". numSpans = " + numSpans + + ", buffer position = " + spans.getBuffer().position()); + } + } + + @Override + public int contentLength() { + return spans.getBuffer().position(); + } + + @Override + public int getNumberOfSpans() { + return numSpans; + } + + @Override + public void prepare() throws IOException { + prequel.beginWriteSpansRequest(null, numSpans); + long totalLength = + prequel.getBuffer().position() + spans.getBuffer().position(); + if (totalLength > PackedBuffer.MAX_HRPC_BODY_LENGTH) { + throw new IOException("Can't send RPC of " + totalLength + " bytes " + + "because it is longer than " + PackedBuffer.MAX_HRPC_BODY_LENGTH); + } + PackedBuffer.writeReqFrame(frameBuffer, + METHOD_ID_WRITE_SPANS, 1, (int)totalLength); + frameBuffer.flip(); + prequel.getBuffer().flip(); + spans.getBuffer().flip(); + if (LOG.isTraceEnabled()) { + LOG.trace("Preparing to send RPC of length " + + (totalLength + PackedBuffer.HRPC_REQ_FRAME_LENGTH) + " to " + + conf.endpointStr + ", containing " + numSpans + " spans."); + } + } + + @Override + public void flush() throws IOException { + SelectionKey sockKey = null; + IOException ioe = null; + frameBuffer.position(0); + prequel.getBuffer().position(0); + spans.getBuffer().position(0); + if (LOG.isTraceEnabled()) { + LOG.trace("Preparing to flush " + numSpans + " spans to " + + conf.endpointStr); + } + try { + sockKey = doConnect(); + doSend(sockKey, new ByteBuffer[] { + frameBuffer, prequel.getBuffer(), spans.getBuffer() }); + ByteBuffer response = prequel.getBuffer(); + readAndValidateResponseFrame(sockKey, response, + 1, METHOD_ID_WRITE_SPANS); + } catch (IOException e) { + // This LOG message is only at debug level because we also log these + // exceptions at error level inside HTracedReceiver. The logging in + // HTracedReceiver is rate-limited to avoid overwhelming the client log + // if htraced goes down. The debug and trace logging is not + // rate-limited. + if (LOG.isDebugEnabled()) { + LOG.debug("Got exception during flush", e); + } + ioe = e; + } finally { + if (sockKey != null) { + sockKey.cancel(); + try { + SocketChannel sock = (SocketChannel)sockKey.attachment(); + sock.close(); + } catch (IOException e) { + if (ioe != null) { + ioe.addSuppressed(e); + } + } + } + } + if (ioe != null) { + throw ioe; + } + if (LOG.isTraceEnabled()) { + LOG.trace("Successfully flushed " + numSpans + " spans to " + + conf.endpointStr); + } + } + + private long updateRemainingMs(long startMs, long timeoutMs) { + long deltaMs = TimeUtil.deltaMs(startMs, TimeUtil.nowMs()); + if (deltaMs > timeoutMs) { + return 0; + } + return timeoutMs - deltaMs; + } + + private SelectionKey doConnect() throws IOException { + SocketChannel sock = SocketChannel.open(); + SelectionKey sockKey = null; + boolean success = false; + try { + if (sock.isBlocking()) { + sock.configureBlocking(false); + } + InetSocketAddress resolvedEndpoint = + new InetSocketAddress(conf.endpoint.getHostString(), + conf.endpoint.getPort()); + resolvedEndpoint.getHostName(); // trigger DNS resolution + sock.connect(resolvedEndpoint); + sockKey = sock.register(selector, SelectionKey.OP_CONNECT, sock); + long startMs = TimeUtil.nowMs(); + long remainingMs = conf.connectTimeoutMs; + while (true) { + selector.select(remainingMs); + for (SelectionKey key : selector.keys()) { + if (key.isConnectable()) { + SocketChannel s = (SocketChannel)key.attachment(); + s.finishConnect(); + if (LOG.isTraceEnabled()) { + LOG.trace("Successfully connected to " + conf.endpointStr + "."); + } + success = true; + return sockKey; + } + } + remainingMs = updateRemainingMs(startMs, conf.connectTimeoutMs); + if (remainingMs == 0) { + throw new IOException("Attempt to connect to " + conf.endpointStr + + " timed out after " + TimeUtil.deltaMs(startMs, TimeUtil.nowMs()) + + " ms."); + } + } + } finally { + if (!success) { + if (sockKey != null) { + sockKey.cancel(); + } + sock.close(); + } + } + } + + /** + * Send the provided ByteBuffer objects. + * + * We use non-blocking I/O because Java does not provide write timeouts. + * Without a write timeout, the socket could get hung and we'd never recover. + * We also use the GatheringByteChannel#write method which calls the pread() + * system call under the covers. This ensures that even if TCP_NODELAY is on, + * we send the minimal number of packets. + */ + private void doSend(SelectionKey sockKey, ByteBuffer[] bufs) + throws IOException { + long totalWritten = 0; + sockKey.interestOps(SelectionKey.OP_WRITE); + SocketChannel sock = (SocketChannel)sockKey.attachment(); + long startMs = TimeUtil.nowMs(); + long remainingMs = conf.ioTimeoutMs; + while (true) { + selector.select(remainingMs); + int firstBuf = 0; + for (SelectionKey key : selector.selectedKeys()) { + if (key.isWritable()) { + long written = sock.write(bufs, firstBuf, bufs.length - firstBuf); + if (LOG.isTraceEnabled()) { + LOG.trace("Sent " + written + " bytes to " + conf.endpointStr); + } + totalWritten += written; + } + } + while (true) { + if (firstBuf == bufs.length) { + if (LOG.isTraceEnabled()) { + LOG.trace("Finished sending " + totalWritten + " bytes to " + + conf.endpointStr); + } + return; + } + if (bufs[firstBuf].remaining() > 0) { + break; + } + firstBuf++; + } + remainingMs = updateRemainingMs(startMs, conf.ioTimeoutMs); + if (remainingMs == 0) { + throw new IOException("Attempt to write to " + conf.endpointStr + + " timed out after " + TimeUtil.deltaMs(startMs, TimeUtil.nowMs()) + + " ms."); + } + } + } + + private void doRecv(SelectionKey sockKey, ByteBuffer response) + throws IOException { + sockKey.interestOps(SelectionKey.OP_READ); + SocketChannel sock = (SocketChannel)sockKey.attachment(); + int totalRead = response.remaining(); + long startMs = TimeUtil.nowMs(); + long remainingMs = conf.ioTimeoutMs; + while (remainingMs > 0) { + selector.select(remainingMs); + for (SelectionKey key : selector.selectedKeys()) { + if (key.isReadable()) { + sock.read(response); + } + } + if (response.remaining() == 0) { + if (LOG.isTraceEnabled()) { + LOG.trace("Received all " + totalRead + " bytes from " + + conf.endpointStr); + } + return; + } + remainingMs = updateRemainingMs(startMs, conf.ioTimeoutMs); + if (LOG.isTraceEnabled()) { + LOG.trace("Received " + (totalRead - response.remaining()) + + " out of " + totalRead + " bytes from " + conf.endpointStr); + } + if (remainingMs == 0) { + throw new IOException("Attempt to write to " + conf.endpointStr + + " timed out after " + TimeUtil.deltaMs(startMs, TimeUtil.nowMs()) + + " ms."); + } + } + } + + private void readAndValidateResponseFrame(SelectionKey sockKey, + ByteBuffer buf, long expectedSeq, int expectedMethodId) + throws IOException { + buf.clear(); + buf.limit(PackedBuffer.HRPC_RESP_FRAME_LENGTH); + doRecv(sockKey, buf); + buf.flip(); + buf.order(ByteOrder.LITTLE_ENDIAN); + long seq = buf.getLong(); + if (seq != expectedSeq) { + throw new IOException("Expected sequence number " + expectedSeq + + ", but got sequence number " + seq); + } + int methodId = buf.getInt(); + if (expectedMethodId != methodId) { + throw new IOException("Expected method id " + expectedMethodId + + ", but got " + methodId); + } + int errorLength = buf.getInt(); + buf.getInt(); + if ((errorLength < 0) || + (errorLength > PackedBuffer.MAX_HRPC_ERROR_LENGTH)) { + throw new IOException("Got server error with invalid length " + + errorLength); + } else if (errorLength > 0) { + buf.clear(); + buf.limit(errorLength); + doRecv(sockKey, buf); + buf.flip(); + CharBuffer charBuf = StandardCharsets.UTF_8.decode(buf); + String serverErrorStr = charBuf.toString(); + throw new IOException("Got server error " + serverErrorStr); + } + } + + @Override + public void clear() { + frameBuffer.clear(); + prequel.getBuffer().clear(); + spans.getBuffer().clear(); + numSpans = 0; + } + + @Override + public void close() { + clear(); + prequel.close(); + spans.close(); + try { + selector.close(); + } catch (IOException e) { + LOG.warn("Error closing selector", e); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/main/java/org/apache/htrace/impl/RateLimitedLogger.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/RateLimitedLogger.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/RateLimitedLogger.java new file mode 100644 index 0000000..ac42ee8 --- /dev/null +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/RateLimitedLogger.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; + +/** + * A logger which rate-limits its logging to a configurable level. + */ +class RateLimitedLogger { + private final Log log; + private final long timeoutMs; + private long lastLogTimeMs; + + public RateLimitedLogger(Log log, long timeoutMs) { + this.log = log; + this.timeoutMs = timeoutMs; + synchronized (this) { + this.lastLogTimeMs = 0L; + } + } + + public void warn(String what) { + long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), + TimeUnit.NANOSECONDS); + synchronized (this) { + if (now >= lastLogTimeMs + timeoutMs) { + log.warn(what); + lastLogTimeMs = now; + } + } + } + + public void error(String what) { + long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), + TimeUnit.NANOSECONDS); + synchronized (this) { + if (now >= lastLogTimeMs + timeoutMs) { + log.error(what); + lastLogTimeMs = now; + } + } + } + + public void error(String what, Throwable e) { + long now = TimeUnit.MILLISECONDS.convert(System.nanoTime(), + TimeUnit.NANOSECONDS); + synchronized (this) { + if (now >= lastLogTimeMs + timeoutMs) { + log.error(what, e); + lastLogTimeMs = now; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java new file mode 100644 index 0000000..2e1aa70 --- /dev/null +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/RestBufferManager.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import java.io.IOException; +import java.net.URL; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.htrace.core.Span; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentProvider; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; + +class RestBufferManager implements BufferManager { + private static final Log LOG = LogFactory.getLog(RestBufferManager.class); + private static ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static ObjectWriter JSON_WRITER = OBJECT_MAPPER.writer(); + private static final Charset UTF8 = Charset.forName("UTF-8"); + private static final byte COMMA_BYTE = (byte)0x2c; + private static final int MAX_PREQUEL_LENGTH = 512; + private static final int MAX_EPILOGUE_LENGTH = 32; + private final Conf conf; + private final HttpClient httpClient; + private final String urlString; + private final ByteBuffer prequel; + private final ByteBuffer spans; + private final ByteBuffer epilogue; + private int numSpans; + + private static class RestBufferManagerContentProvider + implements ContentProvider { + private final ByteBuffer[] bufs; + + private class ByteBufferIterator implements Iterator<ByteBuffer> { + private int bufIdx = -1; + + @Override + public boolean hasNext() { + return (bufIdx + 1) < bufs.length; + } + + @Override + public ByteBuffer next() { + if ((bufIdx + 1) >= bufs.length) { + throw new NoSuchElementException(); + } + bufIdx++; + return bufs[bufIdx]; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + RestBufferManagerContentProvider(ByteBuffer[] bufs) { + this.bufs = bufs; + } + + @Override + public long getLength() { + long total = 0; + for (int i = 0; i < bufs.length; i++) { + total += bufs[i].remaining(); + } + return total; + } + + @Override + public Iterator<ByteBuffer> iterator() { + return new ByteBufferIterator(); + } + } + + /** + * Create an HttpClient instance. + * + * @param connTimeout The timeout to use for connecting. + * @param idleTimeout The idle timeout to use. + */ + static HttpClient createHttpClient(long connTimeout, long idleTimeout) { + HttpClient httpClient = new HttpClient(); + httpClient.setUserAgentField( + new HttpField(HttpHeader.USER_AGENT, "HTracedSpanReceiver")); + httpClient.setConnectTimeout(connTimeout); + httpClient.setIdleTimeout(idleTimeout); + return httpClient; + } + + RestBufferManager(Conf conf) throws Exception { + this.conf = conf; + this.httpClient = + createHttpClient(conf.connectTimeoutMs, conf.idleTimeoutMs); + this.urlString = new URL("http", conf.endpoint.getHostName(), + conf.endpoint.getPort(), "/writeSpans").toString(); + this.prequel = ByteBuffer.allocate(MAX_PREQUEL_LENGTH); + this.spans = ByteBuffer.allocate(conf.bufferSize); + this.epilogue = ByteBuffer.allocate(MAX_EPILOGUE_LENGTH); + clear(); + this.httpClient.start(); + } + + @Override + public void writeSpan(Span span) throws IOException { + byte[] spanJsonBytes = JSON_WRITER.writeValueAsBytes(span); + if ((spans.capacity() - spans.position()) < (spanJsonBytes.length + 1)) { + // Make sure we have enough space for the span JSON and a comma. + throw new IOException("Not enough space remaining in span buffer."); + } + spans.put(COMMA_BYTE); + spans.put(spanJsonBytes); + numSpans++; + } + + @Override + public int contentLength() { + return Math.max(spans.position() - 1, 0); + } + + @Override + public int getNumberOfSpans() { + return numSpans; + } + + @Override + public void prepare() throws IOException { + String prequelString = "{\"Spans\":["; + prequel.put(prequelString.getBytes(UTF8)); + prequel.flip(); + + spans.flip(); + + String epilogueString = "]}"; + epilogue.put(epilogueString.toString().getBytes(UTF8)); + epilogue.flip(); + + if (LOG.isTraceEnabled()) { + LOG.trace("Preparing to send " + contentLength() + " bytes of span " + + "data to " + conf.endpointStr + ", containing " + numSpans + + " spans."); + } + } + + @Override + public void flush() throws IOException { + // Position the buffers at the beginning. + prequel.position(0); + spans.position(spans.limit() == 0 ? 0 : 1); // Skip the first comma + epilogue.position(0); + + RestBufferManagerContentProvider contentProvider = + new RestBufferManagerContentProvider( + new ByteBuffer[] { prequel, spans, epilogue }); + long rpcLength = contentProvider.getLength(); + try { + Request request = httpClient. + newRequest(urlString).method(HttpMethod.POST); + request.header(HttpHeader.CONTENT_TYPE, "application/json"); + request.content(contentProvider); + ContentResponse response = request.send(); + if (response.getStatus() != HttpStatus.OK_200) { + throw new IOException("Got back error response " + + response.getStatus() + " from " + conf.endpointStr + "; " + + response.getContentAsString()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Sent WriteSpansReq of length " + rpcLength + " to " + conf.endpointStr); + } + } catch (InterruptedException e) { + throw new IOException("Interrupted while sending spans via REST", e); + } catch (TimeoutException e) { + throw new IOException("Timed out sending spans via REST", e); + } catch (ExecutionException e) { + throw new IOException("Execution exception sending spans via REST", e); + } + } + + @Override + public void clear() { + prequel.clear(); + spans.clear(); + epilogue.clear(); + numSpans = 0; + } + + @Override + public void close() { + try { + httpClient.stop(); + } catch (Exception e) { + LOG.error("Error stopping HTracedReceiver httpClient", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/main/java/org/apache/htrace/impl/TimeUtil.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/main/java/org/apache/htrace/impl/TimeUtil.java b/htrace-htraced/src/main/java/org/apache/htrace/impl/TimeUtil.java new file mode 100644 index 0000000..7361c97 --- /dev/null +++ b/htrace-htraced/src/main/java/org/apache/htrace/impl/TimeUtil.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import java.math.BigInteger; +import java.util.concurrent.TimeUnit; + +/** + * Utilities for dealing with monotonic time. + */ +class TimeUtil { + /** + * Returns the current monotonic time in milliseconds. + */ + static long nowMs() { + return TimeUnit.MILLISECONDS.convert( + System.nanoTime(), TimeUnit.NANOSECONDS); + } + + /** + * Get the approximate delta between two monotonic times. + * + * This function makes the following assumptions: + * 1. We read startMs from the monotonic clock prior to endMs. + * 2. The two times are not more than 100 years or so apart. + * + * With these two assumptions in hand, we can smooth over some of the + * unpleasant features of the monotonic clock: + * 1. It can return either positive or negative values. + * 2. When the number of nanoseconds reaches Long.MAX_VALUE it wraps around + * to Long.MIN_VALUE. + * 3. On some messed up systems it has been known to jump backwards every + * now and then. Oops. CPU core synchronization mumble mumble. + * + * @param startMs The start time. + * @param endMs The end time. + * @return The delta between the two times. + */ + static long deltaMs(long startMs, long endMs) { + BigInteger startNs = BigInteger.valueOf(TimeUnit.NANOSECONDS. + convert(startMs, TimeUnit.MILLISECONDS)); + BigInteger endNs = BigInteger.valueOf(TimeUnit.NANOSECONDS. + convert(endMs, TimeUnit.MILLISECONDS)); + BigInteger deltaNs = endNs.subtract(startNs); + if (deltaNs.signum() >= 0) { + return TimeUnit.MILLISECONDS.convert(deltaNs.min( + BigInteger.valueOf(Long.MAX_VALUE)).longValue(), TimeUnit.NANOSECONDS); + } + deltaNs = deltaNs.negate(); + if (deltaNs.compareTo(BigInteger.valueOf(Long.MAX_VALUE / 2)) < 0) { + // If the 'startNs' is numerically less than the 'endNs', and the + // difference between the two is less than 100 years, it's probably + // just clock jitter. Certain old OSes and CPUs had monotonic clocks + // that could go backwards under certain conditions (ironic, given + // the name). + return 0L; + } + // Handle rollover. + BigInteger revDeltaNs = BigInteger.ONE.shiftLeft(64).subtract(deltaNs); + return TimeUnit.MILLISECONDS.convert(revDeltaNs.min( + BigInteger.valueOf(Long.MAX_VALUE)).longValue(), TimeUnit.NANOSECONDS); + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/test/java/org/apache/htrace/impl/DataDir.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/DataDir.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/DataDir.java new file mode 100644 index 0000000..f224f6f --- /dev/null +++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/DataDir.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.UUID; + +/** + * Small util for making a data directory for tests to use when running tests. We put it up at + * target/test-data/UUID. Create an instance of this class per unit test run and it will take + * care of setting up the dirs for you. Pass what is returned here as location from which to + * have daemons and tests dump data. + */ +public class DataDir implements Closeable { + private final File dir; + + public DataDir() throws IOException { + String baseDir = System.getProperty( + "test.data.base.dir", "target"); + File testData = new File(new File(baseDir), "test-data"); + this.dir = new File(testData, UUID.randomUUID().toString()); + Files.createDirectories(this.dir.toPath()); + } + + public File get() { + return dir; + } + + @Override + public void close() throws IOException { + /*for (File file : this.dir.listFiles()) { + file.delete(); + } + Files.delete(this.dir.toPath()); */ + } + + @Override + public String toString() { + return "DataDir{" + dir.getAbsolutePath() + "}"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/test/java/org/apache/htrace/impl/HTracedProcess.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/HTracedProcess.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/HTracedProcess.java new file mode 100644 index 0000000..26c1a10 --- /dev/null +++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/HTracedProcess.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.URI; +import java.nio.file.Paths; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.htrace.core.MilliSpan; +import org.apache.htrace.core.Span; +import org.apache.htrace.core.SpanId; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.http.HttpStatus; +import org.junit.Assert; + +/** + * To get instance of HTraced up and running, create an instance of this class. + * Upon successful construction, htraced is running using <code>dataDir</code> as directory to + * host data (leveldbs and logs). + */ +class HTracedProcess extends Process { + private static final Log LOG = LogFactory.getLog(HTracedProcess.class); + + static class Builder { + String host = "localhost"; + + Builder() { + } + + Builder host(String host) { + this.host = host; + return this; + } + + HTracedProcess build() throws Exception { + return new HTracedProcess(this); + } + } + + /** + * Path to the htraced binary. + */ + private final File htracedPath; + + /** + * Temporary directory for test files. + */ + private final DataDir dataDir; + + /** + * The Java Process object for htraced. + */ + private final Process delegate; + + /** + * The HTTP host:port returned from htraced. + */ + private final String httpAddr; + + /** + * The HRPC host:port returned from htraced. + */ + private final String hrpcAddr; + + /** + * REST client to use to talk to htraced. + */ + private final HttpClient httpClient; + + /** + * Data send back from the HTraced process on the notification port. + */ + @JsonIgnoreProperties(ignoreUnknown = true) + public static class StartupNotificationData { + /** + * The hostname:port pair which the HTraced process uses for HTTP requests. + */ + @JsonProperty("HttpAddr") + String httpAddr; + + /** + * The hostname:port pair which the HTraced process uses for HRPC requests. + */ + @JsonProperty("HrpcAddr") + String hrpcAddr; + + /** + * The process ID of the HTraced process. + */ + @JsonProperty("ProcessId") + long processId; + } + + private HTracedProcess(Builder builder) throws Exception { + this.htracedPath = Paths.get( + "target", "..", "go", "build", "htraced").toFile(); + if (!this.htracedPath.exists()) { + throw new RuntimeException("No htraced binary exists at " + + this.htracedPath); + } + this.dataDir = new DataDir(); + // Create a notifier socket bound to a random port. + ServerSocket listener = new ServerSocket(0); + boolean success = false; + Process process = null; + HttpClient http = null; + try { + // Use a random port for the web address. No 'scheme' yet. + String random = builder.host + ":0"; + String logPath = new File(dataDir.get(), "log.txt").getAbsolutePath(); + // Pass cmdline args to htraced to it uses our test dir for data. + ProcessBuilder pb = new ProcessBuilder(htracedPath.getAbsolutePath(), + "-Dlog.level=TRACE", + "-Dlog.path=" + logPath, + "-Dweb.address=" + random, + "-Dhrpc.address=" + random, + "-Ddata.store.clear=true", + "-Dstartup.notification.address=localhost:" + listener.getLocalPort(), + "-Ddata.store.directories=" + dataDir.get().getAbsolutePath()); + pb.redirectErrorStream(true); + // Inherit STDERR/STDOUT i/o; dumps on console for now. Can add logs later. + pb.inheritIO(); + pb.directory(dataDir.get()); + //assert pb.redirectInput() == Redirect.PIPE; + //assert pb.redirectOutput().file() == dataDir; + process = pb.start(); + assert process.getInputStream().read() == -1; + StartupNotificationData data = readStartupNotification(listener); + httpAddr = data.httpAddr; + hrpcAddr = data.hrpcAddr; + LOG.info("Started htraced process " + data.processId + " with http " + + "address " + data.httpAddr + ", logging to " + logPath); + http = RestBufferManager.createHttpClient(60000L, 60000L); + http.start(); + success = true; + } finally { + if (!success) { + // Clean up after failure + if (process != null) { + process.destroy(); + process = null; + } + if (http != null) { + http.stop(); + } + } + delegate = process; + listener.close(); + httpClient = http; + } + } + + private static StartupNotificationData + readStartupNotification(ServerSocket listener) throws IOException { + Socket socket = listener.accept(); + try { + InputStream in = socket.getInputStream(); + ObjectMapper objectMapper = new ObjectMapper(); + StartupNotificationData data = objectMapper. + readValue(in, StartupNotificationData.class); + return data; + } finally { + socket.close(); + } + } + + public int hashCode() { + return delegate.hashCode(); + } + + public OutputStream getOutputStream() { + throw new UnsupportedOperationException("Unsupported until complaint; output on STDOUT"); + } + + public InputStream getInputStream() { + throw new UnsupportedOperationException("Unsupported until complaint; output on STDOUT"); + } + + public boolean equals(Object obj) { + return delegate.equals(obj); + } + + public InputStream getErrorStream() { + throw new UnsupportedOperationException("Unsupported until complaint; output on STDOUT"); + } + + public int waitFor() throws InterruptedException { + return delegate.waitFor(); + } + + public int exitValue() { + return delegate.exitValue(); + } + + public void destroy() { + try { + httpClient.stop(); + } catch (Exception e) { + LOG.error("Error stopping httpClient", e); + } + delegate.destroy(); + try { + dataDir.close(); + } catch (Exception e) { + LOG.error("Error closing " + dataDir, e); + } + LOG.trace("Destroyed htraced process."); + } + + public String toString() { + return delegate.toString(); + } + + public String getHttpAddr() { + return httpAddr; + } + + public String getHrpcAddr() { + return hrpcAddr; + } + + /** + * Ugly but how else to do file-math? + * @param topLevel Presumes top-level of the htrace checkout. + * @return Path to the htraced binary. + */ + public static File getPathToHTraceBinaryFromTopLevel(final File topLevel) { + return new File(new File(new File(new File(topLevel, "htrace-htraced"), "go"), "build"), + "htraced"); + } + + public String getServerInfoJson() throws Exception { + ContentResponse response = httpClient.GET( + new URI(String.format("http://%s/server/info", httpAddr))); + Assert.assertEquals("application/json", response.getMediaType()); + Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); + return response.getContentAsString(); + } + + public Span getSpan(SpanId spanId) throws Exception { + ContentResponse response = httpClient.GET( + new URI(String.format("http://%s/span/%s", + httpAddr, spanId.toString()))); + Assert.assertEquals("application/json", response.getMediaType()); + String responseJson = response.getContentAsString().trim(); + if (responseJson.isEmpty()) { + return null; + } + return MilliSpan.fromJson(responseJson); + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java deleted file mode 100644 index d52f071..0000000 --- a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedRESTReceiver.java +++ /dev/null @@ -1,215 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.htrace.impl; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.net.URL; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.htrace.core.HTraceConfiguration; -import org.apache.htrace.core.MilliSpan; -import org.apache.htrace.core.Span; -import org.apache.htrace.core.SpanId; -import org.apache.htrace.core.TracerId; -import org.apache.htrace.util.DataDir; -import org.apache.htrace.util.HTracedProcess; -import org.apache.htrace.util.TestUtil; -import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.api.ContentResponse; -import org.eclipse.jetty.http.HttpStatus; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class TestHTracedRESTReceiver { - private static final Log LOG = - LogFactory.getLog(TestHTracedRESTReceiver.class); - private URL restServerUrl; - private DataDir dataDir; - HTracedProcess htraced; - - @Before - public void setUp() throws Exception { - this.dataDir = new DataDir(); - File tlDir = DataDir.getTopLevelOfCheckout(this.dataDir.getDataDir()); - File pathToHTracedBinary = HTracedProcess.getPathToHTraceBinaryFromTopLevel(tlDir); - this.htraced = new HTracedProcess(pathToHTracedBinary, - dataDir.getDataDir(), "localhost"); - this.restServerUrl = new URL("http://" + htraced.getHttpAddr() + "/"); - } - - @After - public void tearDown() throws Exception { - if (this.htraced != null) this.htraced.destroy(); - } - - /** - * Our simple version of htrace configuration for testing. - */ - private final class TestHTraceConfiguration extends HTraceConfiguration { - private final URL restServerUrl; - final static String TRACER_ID = "TestHTracedRESTReceiver"; - - public TestHTraceConfiguration(final URL restServerUrl) { - this.restServerUrl = restServerUrl; - } - - @Override - public String get(String key) { - return null; - } - - @Override - public String get(String key, String defaultValue) { - if (key.equals(HTracedRESTReceiver.HTRACED_REST_URL_KEY)) { - return this.restServerUrl.toString(); - } else if (key.equals(TracerId.TRACER_ID_KEY)) { - return TRACER_ID; - } - return defaultValue; - } - } - - /** - * Make sure the REST server basically works. - * @throws Exception - */ - @Test (timeout = 10000) - public void testBasicGet() throws Exception { - HTracedRESTReceiver receiver = - new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl)); - HttpClient http = receiver.createHttpClient(60000L, 60000L); - http.start(); - try { - // Do basic a GET /server/info against htraced - ContentResponse response = - http.GET(restServerUrl + "server/info"); - assertEquals("application/json", response.getMediaType()); - String content = processGET(response); - assertTrue(content.contains("ReleaseVersion")); - System.out.println(content); - } finally { - http.stop(); - receiver.close(); - } - } - - private String processGET(final ContentResponse response) { - assertTrue("" + response.getStatus(), HttpStatus.OK_200 <= response.getStatus() && - response.getStatus() <= HttpStatus.NO_CONTENT_204); - return response.getContentAsString(); - } - - private void testSendingSpansImpl(boolean testClose) throws Exception { - final HTracedRESTReceiver receiver = - new HTracedRESTReceiver(new TestHTraceConfiguration(this.restServerUrl)); - final int NUM_SPANS = 3; - final HttpClient http = receiver.createHttpClient(60000, 60000); - http.start(); - Span spans[] = new Span[NUM_SPANS]; - for (int i = 0; i < NUM_SPANS; i++) { - MilliSpan.Builder builder = new MilliSpan.Builder(). - parents(new SpanId[] { new SpanId(1L, 1L) }). - spanId(new SpanId(1L, i)); - if (i == NUM_SPANS - 1) { - builder.tracerId("specialTrid"); - } else { - builder.tracerId(TestHTraceConfiguration.TRACER_ID); - } - spans[i] = builder.build(); - } - try { - for (int i = 0; i < NUM_SPANS; i++) { - LOG.info("receiving " + spans[i].toString()); - receiver.receiveSpan(spans[i]); - } - if (testClose) { - receiver.close(); - } else { - receiver.startFlushing(); - } - TestUtil.waitFor(new TestUtil.Supplier<Boolean>() { - @Override - public Boolean get() { - try { - for (int i = 0; i < NUM_SPANS; i++) { - // This is what the REST server expects when querying for a - // span id. - String findSpan = String.format("span/%s", - new SpanId(1L, i).toString()); - ContentResponse response = - http.GET(restServerUrl + findSpan); - String content = processGET(response); - if ((content == null) || (content.length() == 0)) { - LOG.info("Failed to find span " + i); - return false; - } - LOG.info("Got " + content + " for span " + i); - MilliSpan dspan = MilliSpan.fromJson(content); - assertEquals(new SpanId(1, i).toString(), - dspan.getSpanId().toString()); - // Every span should have the tracer ID we set in the - // configuration... except for the last span, which had - // a custom value set. - if (i == NUM_SPANS - 1) { - assertEquals("specialTrid", dspan.getTracerId()); - } else { - assertEquals(TestHTraceConfiguration.TRACER_ID, - dspan.getTracerId()); - } - } - return true; - } catch (Throwable t) { - LOG.error("Got exception", t); - return false; - } - } - }, 10, 20000); - } finally { - http.stop(); - if (!testClose) { - receiver.close(); - } - } - } - - /** - * Send 100 spans then confirm they made it in. - * @throws Exception - */ - @Test (timeout = 60000) - public void testSendingSpans() throws Exception { - testSendingSpansImpl(false); - } - - /** - * Test that the REST receiver blocks during shutdown until all spans are sent - * (or a long timeout elapses). Otherwise, short-lived client processes will - * never have a chance to send all their spans and we will have incomplete - * information. - */ - @Test (timeout = 60000) - public void testShutdownBlocksUntilSpanAreSent() throws Exception { - testSendingSpansImpl(true); - } -} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiver.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiver.java new file mode 100644 index 0000000..99f00a1 --- /dev/null +++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiver.java @@ -0,0 +1,572 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Random; +import java.util.concurrent.Semaphore; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.htrace.core.HTraceConfiguration; +import org.apache.htrace.core.Span; +import org.apache.htrace.core.SpanId; +import org.apache.htrace.core.TracerId; +import org.apache.htrace.impl.HTracedSpanReceiver.FaultInjector; +import org.apache.htrace.util.TestUtil; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +public class TestHTracedReceiver { + private static final Log LOG = LogFactory.getLog(TestHTracedReceiver.class); + + @BeforeClass + public static void beforeClass() { + // Allow setting really small buffer sizes for testing purposes. + // We do not allow setting such small sizes in production. + Conf.BUFFER_SIZE_MIN = 0; + } + + @Rule + public TestRule watcher = new TestWatcher() { + protected void starting(Description description) { + LOG.info("*** Starting junit test: " + description.getMethodName()); + } + + protected void finished(Description description) { + LOG.info("*** Finished junit test: " + description.getMethodName()); + } + }; + + @Test(timeout = 60000) + public void testGetServerInfoJson() throws Exception { + HTracedProcess ht = new HTracedProcess.Builder().build(); + try { + String response = ht.getServerInfoJson(); + assertTrue(response.contains("ReleaseVersion")); + } finally { + ht.destroy(); + } + } + + private void waitForSpans(final HTracedProcess ht, Span[] spans) + throws Exception { + waitForSpans(ht, spans, spans.length); + } + + private void waitForSpans(final HTracedProcess ht, Span[] spans, + int numSpans) throws Exception { + final LinkedList<SpanId> spanIds = new LinkedList<SpanId>(); + for (int i = 0; i < numSpans; i++) { + spanIds.add(spans[i].getSpanId()); + } + boolean success = false; + try { + TestUtil.waitFor(new TestUtil.Supplier<Boolean>() { + @Override + public Boolean get() { + for (Iterator<SpanId> iter = spanIds.iterator(); + iter.hasNext(); ) { + SpanId spanId = iter.next(); + try { + if (ht.getSpan(spanId) == null) { + return false; + } + } catch (InterruptedException e) { + LOG.error("Got InterruptedException while looking for " + + "span ID " + spanId, e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOG.error("Got error looking for span ID " + spanId, e); + return false; + } + iter.remove(); + } + return true; + } + }, 10, 30000); + success = true; + } finally { + if (!success) { + String prefix = ""; + StringBuilder idStringBld = new StringBuilder(); + for (Iterator<SpanId> iter = spanIds.iterator(); + iter.hasNext(); ) { + idStringBld.append(prefix); + idStringBld.append(iter.next()); + prefix = ","; + } + LOG.error("Unable to find span IDs " + idStringBld.toString()); + } + } + } + + /** + * Test that we can send spans via the HRPC interface. + */ + @Test(timeout = 10000) //60000) + public void testSendSpansViaPacked() throws Exception { + final Random rand = new Random(123); + final HTracedProcess ht = new HTracedProcess.Builder().build(); + try { + HTraceConfiguration conf = HTraceConfiguration.fromMap( + new HashMap<String, String>() {{ + put(TracerId.TRACER_ID_KEY, "testSendSpansViaPacked"); + put(Conf.ADDRESS_KEY, ht.getHrpcAddr()); + put(Conf.PACKED_KEY, "true"); + put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "100"); + put(Conf.ERROR_LOG_PERIOD_MS_KEY, "0"); + }}); + HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf); + Span[] spans = TestUtil.randomSpans(rand, 10); + for (Span span : spans) { + rcvr.receiveSpan(span); + } + waitForSpans(ht, spans); + rcvr.close(); + } finally { + ht.destroy(); + } + } + + /** + * Test that when the SpanReceiver is closed, we send any spans we have + * buffered via the HRPC interface. + */ + @Test(timeout = 60000) + public void testSendSpansViaPackedAndClose() throws Exception { + final Random rand = new Random(456); + final HTracedProcess ht = new HTracedProcess.Builder().build(); + try { + HTraceConfiguration conf = HTraceConfiguration.fromMap( + new HashMap<String, String>() {{ + put(TracerId.TRACER_ID_KEY, "testSendSpansViaPackedAndClose"); + put(Conf.ADDRESS_KEY, ht.getHrpcAddr()); + put(Conf.PACKED_KEY, "true"); + put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "60000"); + }}); + HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf); + Span[] spans = TestUtil.randomSpans(rand, 10); + for (Span span : spans) { + rcvr.receiveSpan(span); + } + rcvr.close(); + waitForSpans(ht, spans); + } finally { + ht.destroy(); + } + } + + /** + * Test that we can send spans via the REST interface. + */ + @Test(timeout = 60000) + public void testSendSpansViaRest() throws Exception { + final Random rand = new Random(789); + final HTracedProcess ht = new HTracedProcess.Builder().build(); + try { + HTraceConfiguration conf = HTraceConfiguration.fromMap( + new HashMap<String, String>() {{ + put(TracerId.TRACER_ID_KEY, "testSendSpansViaRest"); + put(Conf.ADDRESS_KEY, ht.getHttpAddr()); + put(Conf.PACKED_KEY, "false"); + put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "100"); + }}); + HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf); + Span[] spans = TestUtil.randomSpans(rand, 10); + for (Span span : spans) { + rcvr.receiveSpan(span); + } + waitForSpans(ht, spans); + rcvr.close(); + } finally { + ht.destroy(); + } + } + + /** + * Test that when the SpanReceiver is closed, we send any spans we have + * buffered via the REST interface. + */ + @Test(timeout = 60000) + public void testSendSpansViaRestAndClose() throws Exception { + final Random rand = new Random(321); + final HTracedProcess ht = new HTracedProcess.Builder().build(); + try { + HTraceConfiguration conf = HTraceConfiguration.fromMap( + new HashMap<String, String>() {{ + put(TracerId.TRACER_ID_KEY, "testSendSpansViaRestAndClose"); + put(Conf.ADDRESS_KEY, ht.getHttpAddr()); + put(Conf.PACKED_KEY, "false"); + put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "60000"); + }}); + HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf); + Span[] spans = TestUtil.randomSpans(rand, 10); + for (Span span : spans) { + rcvr.receiveSpan(span); + } + rcvr.close(); + waitForSpans(ht, spans); + } finally { + ht.destroy(); + } + } + + private static class Mutable<T> { + private T t; + + Mutable(T t) { + this.t = t; + } + + void set(T t) { + this.t = t; + } + + T get() { + return this.t; + } + } + + private static class TestHandleContentLengthTriggerInjector + extends HTracedSpanReceiver.FaultInjector { + final Semaphore threadStartSem = new Semaphore(0); + int contentLengthOnTrigger = 0; + + @Override + public synchronized void handleContentLengthTrigger(int len) { + contentLengthOnTrigger = len; + } + @Override + public void handleThreadStart() throws Exception { + threadStartSem.acquire(); + } + + public synchronized int getContentLengthOnTrigger() { + return contentLengthOnTrigger; + } + } + + /** + * Test that filling up one of the buffers causes us to trigger a flush and + * start using the other buffer, when using PackedBufferManager. + * This also tests that PackedBufferManager can correctly handle a buffer + * getting full. + */ + @Test(timeout = 60000) + public void testFullBufferCausesPackedThreadTrigger() throws Exception { + final Random rand = new Random(321); + final HTracedProcess ht = new HTracedProcess.Builder().build(); + try { + HTraceConfiguration conf = HTraceConfiguration.fromMap( + new HashMap<String, String>() {{ + put(TracerId.TRACER_ID_KEY, + "testFullBufferCausesPackedThreadTrigger"); + put(Conf.ADDRESS_KEY, ht.getHrpcAddr()); + put(Conf.PACKED_KEY, "true"); + put(Conf.BUFFER_SIZE_KEY, "16384"); + put(Conf.BUFFER_SEND_TRIGGER_FRACTION_KEY, "0.95"); + }}); + TestHandleContentLengthTriggerInjector injector = + new TestHandleContentLengthTriggerInjector(); + HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector); + Span[] spans = TestUtil.randomSpans(rand, 47); + for (Span span : spans) { + rcvr.receiveSpan(span); + } + Assert.assertTrue("The wakePostSpansThread should have been " + + "triggered by the spans added so far. " + + "contentLengthOnTrigger = " + injector.getContentLengthOnTrigger(), + injector.getContentLengthOnTrigger() > 16000); + injector.threadStartSem.release(); + rcvr.close(); + waitForSpans(ht, spans, 45); + } finally { + ht.destroy(); + } + } + + /** + * Test that filling up one of the buffers causes us to trigger a flush and + * start using the other buffer, when using RestBufferManager. + * This also tests that RestBufferManager can correctly handle a buffer + * getting full. + */ + @Test(timeout = 60000) + public void testFullBufferCausesRestThreadTrigger() throws Exception { + final Random rand = new Random(321); + final HTracedProcess ht = new HTracedProcess.Builder().build(); + try { + HTraceConfiguration conf = HTraceConfiguration.fromMap( + new HashMap<String, String>() {{ + put(TracerId.TRACER_ID_KEY, + "testFullBufferCausesRestThreadTrigger"); + put(Conf.ADDRESS_KEY, ht.getHttpAddr()); + put(Conf.PACKED_KEY, "false"); + put(Conf.BUFFER_SIZE_KEY, "16384"); + put(Conf.BUFFER_SEND_TRIGGER_FRACTION_KEY, "0.95"); + }}); + TestHandleContentLengthTriggerInjector injector = + new TestHandleContentLengthTriggerInjector(); + HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector); + Span[] spans = TestUtil.randomSpans(rand, 34); + for (Span span : spans) { + rcvr.receiveSpan(span); + } + Assert.assertTrue("The wakePostSpansThread should have been " + + "triggered by the spans added so far. " + + "contentLengthOnTrigger = " + injector.getContentLengthOnTrigger(), + injector.getContentLengthOnTrigger() > 16000); + injector.threadStartSem.release(); + rcvr.close(); + waitForSpans(ht, spans, 33); + } finally { + ht.destroy(); + } + } + + /** + * A FaultInjector that causes all flushes to fail until a specified + * number of milliseconds have passed. + */ + private static class TestInjectFlushFaults + extends HTracedSpanReceiver.FaultInjector { + private long remainingFaults; + + TestInjectFlushFaults(long remainingFaults) { + this.remainingFaults = remainingFaults; + } + + @Override + public synchronized void handleFlush() throws IOException { + if (remainingFaults > 0) { + remainingFaults--; + throw new IOException("Injected IOException into flush " + + "code path."); + } + } + } + + /** + * Test that even if the flush fails, the system stays stable and we can + * still close the span receiver. + */ + @Test(timeout = 60000) + public void testPackedThreadHandlesFlushFailure() throws Exception { + final Random rand = new Random(321); + final HTracedProcess ht = new HTracedProcess.Builder().build(); + try { + HTraceConfiguration conf = HTraceConfiguration.fromMap( + new HashMap<String, String>() {{ + put(TracerId.TRACER_ID_KEY, "testPackedThreadHandlesFlushFailure"); + put(Conf.ADDRESS_KEY, ht.getHrpcAddr()); + put(Conf.PACKED_KEY, "true"); + }}); + TestInjectFlushFaults injector = new TestInjectFlushFaults(Long.MAX_VALUE); + HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector); + Span[] spans = TestUtil.randomSpans(rand, 15); + for (Span span : spans) { + rcvr.receiveSpan(span); + } + rcvr.close(); + } finally { + ht.destroy(); + } + } + + /** + * Test that even if the flush fails, the system stays stable and we can + * still close the span receiver. + */ + @Test(timeout = 60000) + public void testRestThreadHandlesFlushFailure() throws Exception { + final Random rand = new Random(321); + final HTracedProcess ht = new HTracedProcess.Builder().build(); + try { + HTraceConfiguration conf = HTraceConfiguration.fromMap( + new HashMap<String, String>() {{ + put(TracerId.TRACER_ID_KEY, "testRestThreadHandlesFlushFailure"); + put(Conf.ADDRESS_KEY, ht.getHttpAddr()); + put(Conf.PACKED_KEY, "false"); + }}); + TestInjectFlushFaults injector = new TestInjectFlushFaults(Long.MAX_VALUE); + HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector); + Span[] spans = TestUtil.randomSpans(rand, 15); + for (Span span : spans) { + rcvr.receiveSpan(span); + } + rcvr.close(); + } finally { + ht.destroy(); + } + } + + /** + * A FaultInjector that causes all flushes to fail until a specified + * number of milliseconds have passed. + */ + private static class WaitForFlushes + extends HTracedSpanReceiver.FaultInjector { + final Semaphore flushSem; + + WaitForFlushes(int numFlushes) { + this.flushSem = new Semaphore(-numFlushes); + } + + @Override + public void handleFlush() throws IOException { + flushSem.release(); + } + } + + /** + * Test that the packed code works when performing multiple flushes. + */ + @Test(timeout = 60000) + public void testMultiplePackedFlushes() throws Exception { + final Random rand = new Random(123); + final HTracedProcess ht = new HTracedProcess.Builder().build(); + try { + HTraceConfiguration conf = HTraceConfiguration.fromMap( + new HashMap<String, String>() {{ + put(TracerId.TRACER_ID_KEY, "testMultiplePackedFlushes"); + put(Conf.ADDRESS_KEY, ht.getHrpcAddr()); + put(Conf.PACKED_KEY, "true"); + put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1"); + }}); + WaitForFlushes injector = new WaitForFlushes(5); + HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector); + Span[] spans = TestUtil.randomSpans(rand, 3); + while (true) { + for (Span span : spans) { + rcvr.receiveSpan(span); + } + if (injector.flushSem.availablePermits() >= 0) { + break; + } + Thread.sleep(1); + } + waitForSpans(ht, spans, 3); + rcvr.close(); + } finally { + ht.destroy(); + } + } + + /** + * Test that the REST code works when performing multiple flushes. + */ + @Test(timeout = 60000) + public void testMultipleRestFlushes() throws Exception { + final Random rand = new Random(123); + final HTracedProcess ht = new HTracedProcess.Builder().build(); + try { + HTraceConfiguration conf = HTraceConfiguration.fromMap( + new HashMap<String, String>() {{ + put(TracerId.TRACER_ID_KEY, "testMultipleRestFlushes"); + put(Conf.ADDRESS_KEY, ht.getHttpAddr()); + put(Conf.PACKED_KEY, "false"); + put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1"); + }}); + WaitForFlushes injector = new WaitForFlushes(5); + HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector); + Span[] spans = TestUtil.randomSpans(rand, 3); + while (true) { + for (Span span : spans) { + rcvr.receiveSpan(span); + } + if (injector.flushSem.availablePermits() >= 0) { + break; + } + Thread.sleep(1); + } + waitForSpans(ht, spans, 3); + rcvr.close(); + } finally { + ht.destroy(); + } + } + + /** + * Test that the packed code works when performing multiple flushes. + */ + @Test(timeout = 60000) + public void testPackedRetryAfterFlushError() throws Exception { + final Random rand = new Random(123); + final HTracedProcess ht = new HTracedProcess.Builder().build(); + try { + HTraceConfiguration conf = HTraceConfiguration.fromMap( + new HashMap<String, String>() {{ + put(TracerId.TRACER_ID_KEY, "testPackedRetryAfterFlushError"); + put(Conf.ADDRESS_KEY, ht.getHrpcAddr()); + put(Conf.PACKED_KEY, "true"); + put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1000"); + put(Conf.FLUSH_RETRY_DELAYS_KEY, "100,100,100,100,100,100,100"); + }}); + TestInjectFlushFaults injector = new TestInjectFlushFaults(5); + HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector); + Span[] spans = TestUtil.randomSpans(rand, 3); + for (Span span : spans) { + rcvr.receiveSpan(span); + } + waitForSpans(ht, spans); + rcvr.close(); + } finally { + ht.destroy(); + } + } + + /** + * Test that the REST code works when performing multiple flushes. + */ + @Test(timeout = 60000) + public void testRestRetryAfterFlushError() throws Exception { + final Random rand = new Random(123); + final HTracedProcess ht = new HTracedProcess.Builder().build(); + try { + HTraceConfiguration conf = HTraceConfiguration.fromMap( + new HashMap<String, String>() {{ + put(TracerId.TRACER_ID_KEY, "testRestRetryAfterFlushError"); + put(Conf.ADDRESS_KEY, ht.getHttpAddr()); + put(Conf.PACKED_KEY, "false"); + put(Conf.MAX_FLUSH_INTERVAL_MS_KEY, "1000"); + put(Conf.FLUSH_RETRY_DELAYS_KEY, "100,100,100,100,100,100,100"); + }}); + TestInjectFlushFaults injector = new TestInjectFlushFaults(5); + HTracedSpanReceiver rcvr = new HTracedSpanReceiver(conf, injector); + Span[] spans = TestUtil.randomSpans(rand, 3); + for (Span span : spans) { + rcvr.receiveSpan(span); + } + waitForSpans(ht, spans); + rcvr.close(); + } finally { + ht.destroy(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiverConf.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiverConf.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiverConf.java new file mode 100644 index 0000000..bf038f1 --- /dev/null +++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestHTracedReceiverConf.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.htrace.core.HTraceConfiguration; +import org.junit.Assert; +import org.junit.Test; + +public class TestHTracedReceiverConf { + private static final Log LOG = + LogFactory.getLog(TestHTracedReceiverConf.class); + + @Test(timeout = 60000) + public void testParseHostPort() throws Exception { + InetSocketAddress addr = new Conf( + HTraceConfiguration.fromKeyValuePairs( + Conf.ADDRESS_KEY, "example.com:8080")).endpoint; + Assert.assertEquals("example.com", addr.getHostName()); + Assert.assertEquals(8080, addr.getPort()); + + addr = new Conf( + HTraceConfiguration.fromKeyValuePairs( + Conf.ADDRESS_KEY, "127.0.0.1:8081")).endpoint; + Assert.assertEquals("127.0.0.1", addr.getHostName()); + Assert.assertEquals(8081, addr.getPort()); + + addr = new Conf( + HTraceConfiguration.fromKeyValuePairs( + Conf.ADDRESS_KEY, "[ff02:0:0:0:0:0:0:12]:9095")).endpoint; + Assert.assertEquals("ff02:0:0:0:0:0:0:12", addr.getHostName()); + Assert.assertEquals(9095, addr.getPort()); + } + + private static void verifyFail(String hostPort) { + try { + new Conf(HTraceConfiguration.fromKeyValuePairs( + Conf.ADDRESS_KEY, hostPort)); + Assert.fail("Expected bad host:port configuration " + hostPort + + " to fail, but it succeeded."); + } catch (IOException e) { + // expected + } + } + + @Test(timeout = 60000) + public void testFailToParseHostPort() throws Exception { + verifyFail("localhost"); // no port + verifyFail("127.0.0.1"); // no port + verifyFail(":8080"); // no hostname + verifyFail("bob[ff02:0:0:0:0:0:0:12]:9095"); // bracket at incorrect place + } + + @Test(timeout = 60000) + public void testGetIntArray() throws Exception { + int[] arr = Conf.getIntArray(""); + Assert.assertEquals(0, arr.length); + arr = Conf.getIntArray("123"); + Assert.assertEquals(1, arr.length); + Assert.assertEquals(123, arr[0]); + arr = Conf.getIntArray("1,2,3"); + Assert.assertEquals(3, arr.length); + Assert.assertEquals(1, arr[0]); + Assert.assertEquals(2, arr[1]); + Assert.assertEquals(3, arr[2]); + arr = Conf.getIntArray(",-4,5,66,"); + Assert.assertEquals(3, arr.length); + Assert.assertEquals(-4, arr[0]); + Assert.assertEquals(5, arr[1]); + Assert.assertEquals(66, arr[2]); + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/test/java/org/apache/htrace/impl/TestPackedBuffer.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestPackedBuffer.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestPackedBuffer.java new file mode 100644 index 0000000..ed7d904 --- /dev/null +++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestPackedBuffer.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import java.nio.ByteBuffer; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.htrace.core.Span; +import org.apache.htrace.util.TestUtil; +import org.junit.Assert; +import org.junit.Test; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessageUnpacker; + +public class TestPackedBuffer { + private static final Log LOG = LogFactory.getLog(TestPackedBuffer.class); + + @Test(timeout = 60000) + public void testWriteReqFrame() throws Exception { + byte[] arr = new byte[PackedBuffer.HRPC_REQ_FRAME_LENGTH]; + ByteBuffer bb = ByteBuffer.wrap(arr); + PackedBuffer buf = new PackedBuffer(bb); + PackedBuffer.writeReqFrame(bb, 1, 123, 456); + Assert.assertEquals(PackedBuffer.HRPC_REQ_FRAME_LENGTH, bb.position()); + Assert.assertEquals("48 54 52 43 " + + "01 00 00 00 " + + "7b 00 00 00 00 00 00 00 " + + "c8 01 00 00", + buf.toHexString()); + } + + @Test(timeout = 60000) + public void testPackSpans() throws Exception { + Random rand = new Random(123); + byte[] arr = new byte[16384]; + ByteBuffer bb = ByteBuffer.wrap(arr); + bb.limit(bb.capacity()); + PackedBuffer buf = new PackedBuffer(bb); + final int NUM_TEST_SPANS = 5; + Span[] spans = new Span[NUM_TEST_SPANS]; + for (int i = 0; i < NUM_TEST_SPANS; i++) { + spans[i] = TestUtil.randomSpan(rand); + } + for (int i = 0; i < NUM_TEST_SPANS; i++) { + buf.writeSpan(spans[i]); + } + LOG.info("wrote " + buf.toHexString()); + MessagePack msgpack = new MessagePack(PackedBuffer.MSGPACK_CONF); + MessageUnpacker unpacker = msgpack.newUnpacker(arr, 0, bb.position()); + Span[] respans = new Span[NUM_TEST_SPANS]; + for (int i = 0; i < NUM_TEST_SPANS; i++) { + respans[i] = PackedBuffer.readSpan(unpacker); + } + for (int i = 0; i < NUM_TEST_SPANS; i++) { + Assert.assertEquals("Failed to read back span " + i, + spans[i].toJson(), respans[i].toJson()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/test/java/org/apache/htrace/impl/TestTimeUtil.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/test/java/org/apache/htrace/impl/TestTimeUtil.java b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestTimeUtil.java new file mode 100644 index 0000000..630a02a --- /dev/null +++ b/htrace-htraced/src/test/java/org/apache/htrace/impl/TestTimeUtil.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.impl; + +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Test; + +public class TestTimeUtil { + /** + * Test that our deltaMs function can compute the time difference between any + * two monotonic times in milliseconds. + */ + @Test(timeout = 60000) + public void testDeltaMs() throws Exception { + Assert.assertEquals(0, TimeUtil.deltaMs(0, 0)); + Assert.assertEquals(1, TimeUtil.deltaMs(0, 1)); + Assert.assertEquals(0, TimeUtil.deltaMs(1, 0)); + Assert.assertEquals(10, TimeUtil.deltaMs(1000, 1010)); + long minMs = TimeUnit.MILLISECONDS.convert(Long.MIN_VALUE, + TimeUnit.NANOSECONDS); + long maxMs = TimeUnit.MILLISECONDS.convert(Long.MAX_VALUE, + TimeUnit.NANOSECONDS); + Assert.assertEquals(10, TimeUtil.deltaMs(minMs, minMs + 10)); + Assert.assertEquals(maxMs, TimeUtil.deltaMs(minMs, maxMs)); + Assert.assertEquals(11, TimeUtil.deltaMs(maxMs - 10, minMs)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/test/java/org/apache/htrace/util/DataDir.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/test/java/org/apache/htrace/util/DataDir.java b/htrace-htraced/src/test/java/org/apache/htrace/util/DataDir.java deleted file mode 100644 index 74731fa..0000000 --- a/htrace-htraced/src/test/java/org/apache/htrace/util/DataDir.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.htrace.util; - -import java.io.File; -import java.io.IOException; -import java.util.UUID; - -/** - * Small util for making a data directory for tests to use when running tests. We put it up at - * target/test-data/UUID. Create an instance of this class per unit test run and it will take - * care of setting up the dirs for you. Pass what is returned here as location from which to - * have daemons and tests dump data. - * TODO: Add close on exit. - */ -public class DataDir { - private File baseTestDir = null; - private File testDir = null; - - /** - * System property key to get base test directory value - */ - public static final String TEST_BASE_DIRECTORY_KEY = "test.data.base.dir"; - - /** - * Default base directory for test output. - */ - public static final String TEST_BASE_DIRECTORY_DEFAULT = "target"; - - public static final String TEST_BASE_DIRECTORY_NAME = "test-data"; - - /** - * @return Where to write test data on local filesystem; usually - * {@link #TEST_BASE_DIRECTORY_DEFAULT} - * Should not be used directly by the unit tests, hence its's private. - * Unit test will use a subdirectory of this directory. - * @see #setupDataTestDir() - */ - private synchronized File getBaseTestDir() { - if (this.baseTestDir != null) return this.baseTestDir; - String testHome = System.getProperty(TEST_BASE_DIRECTORY_KEY, TEST_BASE_DIRECTORY_DEFAULT); - this.baseTestDir = new File(testHome, TEST_BASE_DIRECTORY_NAME); - return this.baseTestDir; - } - - /** - * @return Absolute path to the dir created by this instance. - * @throws IOException - */ - public synchronized File getDataDir() throws IOException { - if (this.testDir != null) return this.testDir; - this.testDir = new File(getBaseTestDir(), UUID.randomUUID().toString()); - if (!this.testDir.exists()) { - if (!this.testDir.mkdirs()) throw new IOException("Failed mkdirs for " + this.testDir); - } - // Return absolute path. A relative passed to htraced will have it create data dirs relative - // to its data dir rather than in it. - return this.testDir.getAbsoluteFile(); - } - - /** - * Fragile. Ugly. Presumes paths. Best we can do for now until htraced comes local to this module - * and is moved out of src dir. - * @param dataDir A datadir gotten from {@link #getDataDir()} - * @return Top-level of the checkout. - */ - public static File getTopLevelOfCheckout(final File dataDir) { - // Need absolute else we run out of road when dir is relative to this module. - File absolute = dataDir.getAbsoluteFile(); - // Check we are where we think we are. - File testDataDir = absolute.getParentFile(); - if (!testDataDir.getName().equals(TEST_BASE_DIRECTORY_NAME)) { - throw new IllegalArgumentException(dataDir.toString()); - } - // Do another check. - File targetDir = testDataDir.getParentFile(); - if (!targetDir.getName().equals(TEST_BASE_DIRECTORY_DEFAULT)) { - throw new IllegalArgumentException(dataDir.toString()); - } - // Back up last two dirs out of the htrace-htraced dir. - return targetDir.getParentFile().getParentFile(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/cb2351d2/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java ---------------------------------------------------------------------- diff --git a/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java b/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java deleted file mode 100644 index 3e800d2..0000000 --- a/htrace-htraced/src/test/java/org/apache/htrace/util/HTracedProcess.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.htrace.util; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.ProcessBuilder.Redirect; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.URL; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * To get instance of HTraced up and running, create an instance of this class. - * Upon successful construction, htraced is running using <code>dataDir</code> as directory to - * host data (leveldbs and logs). - * TODO: We expect to find the htraced in a very particular place. Fragile. Will break if stuff - * moves. - */ -public class HTracedProcess extends Process { - private static final Log LOG = LogFactory.getLog(HTracedProcess.class); - private final Process delegate; - - private final String httpAddr; - - /** - * Data send back from the HTraced process on the notification port. - */ - @JsonIgnoreProperties(ignoreUnknown = true) - public static class StartupNotificationData { - /** - * The hostname:port pair which the HTraced process uses for HTTP requests. - */ - @JsonProperty("HttpAddr") - String httpAddr; - - /** - * The process ID of the HTraced process. - */ - @JsonProperty("ProcessId") - long processId; - } - - public HTracedProcess(final File binPath, final File dataDir, - final String host) throws IOException { - // Create a notifier socket bound to a random port. - ServerSocket listener = new ServerSocket(0); - boolean success = false; - Process process = null; - try { - // Use a random port for the web address. No 'scheme' yet. - String webAddress = host + ":0"; - String logPath = new File(dataDir, "log.txt").getAbsolutePath(); - // Pass cmdline args to htraced to it uses our test dir for data. - ProcessBuilder pb = new ProcessBuilder(binPath.toString(), - "-Dlog.level=TRACE", - "-Dlog.path=" + logPath, - "-Dweb.address=" + webAddress, - "-Ddata.store.clear=true", - "-Dstartup.notification.address=localhost:" + listener.getLocalPort(), - "-Ddata.store.directories=" + dataDir.toString()); - pb.redirectErrorStream(true); - // Inherit STDERR/STDOUT i/o; dumps on console for now. Can add logs later. - pb.inheritIO(); - pb.directory(dataDir); - //assert pb.redirectInput() == Redirect.PIPE; - //assert pb.redirectOutput().file() == dataDir; - process = pb.start(); - assert process.getInputStream().read() == -1; - StartupNotificationData data = readStartupNotification(listener); - httpAddr = data.httpAddr; - LOG.info("Started htraced process " + data.processId + " with http " + - "address " + data.httpAddr + ", logging to " + logPath); - success = true; - } finally { - if (!success) { - // Clean up after failure - if (process != null) { - process.destroy(); - process = null; - } - } - delegate = process; - listener.close(); - } - } - - private static StartupNotificationData - readStartupNotification(ServerSocket listener) throws IOException { - Socket socket = listener.accept(); - try { - InputStream in = socket.getInputStream(); - ObjectMapper objectMapper = new ObjectMapper(); - StartupNotificationData data = objectMapper. - readValue(in, StartupNotificationData.class); - return data; - } finally { - socket.close(); - } - } - - public int hashCode() { - return delegate.hashCode(); - } - - public OutputStream getOutputStream() { - throw new UnsupportedOperationException("Unsupported until complaint; output on STDOUT"); - } - - public InputStream getInputStream() { - throw new UnsupportedOperationException("Unsupported until complaint; output on STDOUT"); - } - - public boolean equals(Object obj) { - return delegate.equals(obj); - } - - public InputStream getErrorStream() { - throw new UnsupportedOperationException("Unsupported until complaint; output on STDOUT"); - } - - public int waitFor() throws InterruptedException { - return delegate.waitFor(); - } - - public int exitValue() { - return delegate.exitValue(); - } - - public void destroy() { - delegate.destroy(); - } - - public String toString() { - return delegate.toString(); - } - - public String getHttpAddr() { - return httpAddr; - } - - /** - * Ugly but how else to do file-math? - * @param topLevel Presumes top-level of the htrace checkout. - * @return Path to the htraced binary. - */ - public static File getPathToHTraceBinaryFromTopLevel(final File topLevel) { - return new File(new File(new File(new File(topLevel, "htrace-htraced"), "go"), "build"), - "htraced"); - } -}
