Repository: incubator-htrace Updated Branches: refs/heads/master 82950b600 -> 7b34b4242
HTRACE-112. Fix LocalFileSpanReceiver to avoid BG thread and problems around close (cmccabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/7b34b424 Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/7b34b424 Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/7b34b424 Branch: refs/heads/master Commit: 7b34b42424998cbf4a3f22d2bbeed29e5e00f9f4 Parents: 82950b6 Author: Colin P. Mccabe <[email protected]> Authored: Sat Feb 21 18:15:28 2015 -0800 Committer: Colin P. Mccabe <[email protected]> Committed: Sat Feb 21 18:15:28 2015 -0800 ---------------------------------------------------------------------- .../htrace/impl/LocalFileSpanReceiver.java | 201 +++++++++++++------ 1 file changed, 142 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/7b34b424/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java ---------------------------------------------------------------------- diff --git a/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java b/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java index 2683f81..e2ddad1 100644 --- a/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java +++ b/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java @@ -16,6 +16,7 @@ */ package org.apache.htrace.impl; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; import org.apache.commons.logging.Log; @@ -31,106 +32,188 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.FileWriter; -import java.io.InputStreamReader; import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.FileSystems; +import java.nio.file.StandardOpenOption; import java.util.UUID; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; /** * Writes the spans it receives to a local file. - * A production LocalFileSpanReceiver should use a real CSV format. */ public class LocalFileSpanReceiver implements SpanReceiver { public static final Log LOG = LogFactory.getLog(LocalFileSpanReceiver.class); public static final String PATH_KEY = "local-file-span-receiver.path"; public static final String CAPACITY_KEY = "local-file-span-receiver.capacity"; - // default capacity for the executors blocking queue public static final int CAPACITY_DEFAULT = 5000; - // default timeout duration when calling executor.awaitTermination() - public static final long EXECUTOR_TERMINATION_TIMEOUT_DURATION_DEFAULT = 60; private static ObjectWriter JSON_WRITER = new ObjectMapper().writer(); - private String file; - private Writer writer; - private ExecutorService executor; - private long executorTerminationTimeoutDuration; + private final String path; + + private byte[][] bufferedSpans; + private int bufferedSpansIndex; + private final ReentrantLock bufferLock = new ReentrantLock(); + + private final FileOutputStream stream; + private final FileChannel channel; + private final ReentrantLock channelLock = new ReentrantLock(); public LocalFileSpanReceiver(HTraceConfiguration conf) { - this.executorTerminationTimeoutDuration = EXECUTOR_TERMINATION_TIMEOUT_DURATION_DEFAULT; int capacity = conf.getInt(CAPACITY_KEY, CAPACITY_DEFAULT); - this.file = conf.get(PATH_KEY); - if (file == null || file.isEmpty()) { + if (capacity < 1) { + throw new IllegalArgumentException(CAPACITY_KEY + " must not be " + + "less than 1."); + } + this.path = conf.get(PATH_KEY); + if (path == null || path.isEmpty()) { throw new IllegalArgumentException("must configure " + PATH_KEY); } - this.executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(capacity)); boolean success = false; - FileOutputStream fos = null; try { - fos = new FileOutputStream(file, true); - this.writer = new BufferedWriter( - new OutputStreamWriter(fos,"UTF-8")); - success = true; + this.stream = new FileOutputStream(path, true); } catch (IOException ioe) { - LOG.warn("Error opening " + file + ": " + ioe.getMessage()); + LOG.error("Error opening " + path + ": " + ioe.getMessage()); throw new RuntimeException(ioe); - } finally { - if (!success) { - if (fos != null) { - try { - fos.close(); - } catch (IOException e) { - LOG.error("Error closing output stream for " + file, e); - } - } + } + this.channel = stream.getChannel(); + if (this.channel == null) { + try { + this.stream.close(); + } catch (IOException e) { + LOG.error("Error closing " + path, e); } + LOG.error("Failed to get channel for " + path); + throw new RuntimeException("Failed to get channel for " + path); } + this.bufferedSpans = new byte[capacity][]; + this.bufferedSpansIndex = 0; } - private class WriteSpanRunnable implements Runnable { - public final Span span; + /** + * Number of buffers to use in FileChannel#write. + * + * On UNIX, FileChannel#write uses writev-- a kernel interface that allows + * us to send multiple buffers at once. This is more efficient than making a + * separate write call for each buffer, since it minimizes the number of + * transitions from userspace to kernel space. + */ + private final int WRITEV_SIZE = 20; - public WriteSpanRunnable(Span span) { - this.span = span; - } + /** + * Flushes a bufferedSpans array. + */ + private void doFlush(byte[][] toFlush, int len) throws IOException { + int bidx = 0, widx = 0; + ByteBuffer writevBufs[] = new ByteBuffer[2 * WRITEV_SIZE]; + ByteBuffer newlineBuf = ByteBuffer.wrap(new byte[] { (byte)0xa }); - @Override - public void run() { - try { - JSON_WRITER.writeValue(writer, span); - writer.write("%n"); - } catch (IOException e) { - LOG.error("Error when writing to file: " + file, e); + while (true) { + if (widx == writevBufs.length) { + channel.write(writevBufs); + widx = 0; } + if (bidx == len) { + break; + } + writevBufs[widx] = ByteBuffer.wrap(toFlush[bidx]); + writevBufs[widx + 1] = newlineBuf; + bidx++; + widx+=2; + } + if (widx > 0) { + channel.write(writevBufs, 0, widx); } } @Override public void receiveSpan(Span span) { - executor.submit(new WriteSpanRunnable(span)); + // Serialize the span data into a byte[]. Note that we're not holding the + // lock here, to improve concurrency. + byte jsonBuf[] = null; + try { + jsonBuf = JSON_WRITER.writeValueAsBytes(span); + } catch (JsonProcessingException e) { + LOG.error("receiveSpan(path=" + path + ", span=" + span + "): " + + "Json processing error: " + e.getMessage()); + return; + } + + // Grab the bufferLock and put our jsonBuf into the list of buffers to + // flush. + byte toFlush[][] = null; + bufferLock.lock(); + try { + if (bufferedSpans == null) { + LOG.debug("receiveSpan(path=" + path + ", span=" + span + "): " + + "LocalFileSpanReceiver for " + path + " is closed."); + return; + } + bufferedSpans[bufferedSpansIndex] = jsonBuf; + bufferedSpansIndex++; + if (bufferedSpansIndex == bufferedSpans.length) { + // If we've hit the limit for the number of buffers to flush, + // swap out the existing bufferedSpans array for a new array, and + // prepare to flush those spans to disk. + toFlush = bufferedSpans; + bufferedSpansIndex = 0; + bufferedSpans = new byte[bufferedSpans.length][]; + } + } finally { + bufferLock.unlock(); + } + if (toFlush != null) { + // We released the bufferLock above, to avoid blocking concurrent + // receiveSpan calls. But now, we must take the channelLock, to make + // sure that we have sole access to the output channel. If we did not do + // this, we might get interleaved output. + // + // There is a small chance that another thread doing a flush of more + // recent spans could get ahead of us here, and take the lock before we + // do. This is ok, since spans don't have to be written out in order. + channelLock.lock(); + try { + doFlush(toFlush, toFlush.length); + } catch (IOException ioe) { + LOG.error("Error flushing buffers to " + path + ": " + + ioe.getMessage()); + } finally { + channelLock.unlock(); + } + } } @Override public void close() throws IOException { - executor.shutdown(); + byte toFlush[][] = null; + int numToFlush = 0; + bufferLock.lock(); try { - if (!executor.awaitTermination(this.executorTerminationTimeoutDuration, - TimeUnit.SECONDS)) { - LOG.warn("Was not able to process all remaining spans to write upon closing in: " - + this.executorTerminationTimeoutDuration + "s"); + if (bufferedSpans == null) { + LOG.info("LocalFileSpanReceiver for " + path + " was already closed."); + return; } - } catch (InterruptedException e1) { - LOG.warn("Thread interrupted when terminating executor.", e1); + numToFlush = bufferedSpansIndex; + bufferedSpansIndex = 0; + toFlush = bufferedSpans; + bufferedSpans = null; + } finally { + bufferLock.unlock(); } - + channelLock.lock(); try { - writer.close(); - } catch (IOException e) { - LOG.error("Error closing writer for file: " + file, e); + doFlush(toFlush, numToFlush); + } catch (IOException ioe) { + LOG.error("Error flushing buffers to " + path + ": " + + ioe.getMessage()); + } finally { + try { + stream.close(); + } catch (IOException e) { + LOG.error("Error closing stream for " + path, e); + } + channelLock.unlock(); } }
