Repository: incubator-htrace
Updated Branches:
  refs/heads/master 82950b600 -> 7b34b4242


HTRACE-112. Fix LocalFileSpanReceiver to avoid BG thread and problems around 
close (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/7b34b424
Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/7b34b424
Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/7b34b424

Branch: refs/heads/master
Commit: 7b34b42424998cbf4a3f22d2bbeed29e5e00f9f4
Parents: 82950b6
Author: Colin P. Mccabe <[email protected]>
Authored: Sat Feb 21 18:15:28 2015 -0800
Committer: Colin P. Mccabe <[email protected]>
Committed: Sat Feb 21 18:15:28 2015 -0800

----------------------------------------------------------------------
 .../htrace/impl/LocalFileSpanReceiver.java      | 201 +++++++++++++------
 1 file changed, 142 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/7b34b424/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java
----------------------------------------------------------------------
diff --git 
a/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java 
b/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java
index 2683f81..e2ddad1 100644
--- 
a/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java
+++ 
b/htrace-core/src/main/java/org/apache/htrace/impl/LocalFileSpanReceiver.java
@@ -16,6 +16,7 @@
  */
 package org.apache.htrace.impl;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
 import org.apache.commons.logging.Log;
@@ -31,106 +32,188 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.FileWriter;
-import java.io.InputStreamReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.FileSystems;
+import java.nio.file.StandardOpenOption;
 import java.util.UUID;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Writes the spans it receives to a local file.
- * A production LocalFileSpanReceiver should use a real CSV format.
  */
 public class LocalFileSpanReceiver implements SpanReceiver {
   public static final Log LOG = LogFactory.getLog(LocalFileSpanReceiver.class);
   public static final String PATH_KEY = "local-file-span-receiver.path";
   public static final String CAPACITY_KEY = 
"local-file-span-receiver.capacity";
-  // default capacity for the executors blocking queue
   public static final int CAPACITY_DEFAULT = 5000;
-  // default timeout duration when calling executor.awaitTermination()
-  public static final long EXECUTOR_TERMINATION_TIMEOUT_DURATION_DEFAULT = 60;
   private static ObjectWriter JSON_WRITER = new ObjectMapper().writer();
-  private String file;
-  private Writer writer;
-  private ExecutorService executor;
-  private long executorTerminationTimeoutDuration;
+  private final String path;
+
+  private byte[][] bufferedSpans;
+  private int bufferedSpansIndex;
+  private final ReentrantLock bufferLock = new ReentrantLock();
+
+  private final FileOutputStream stream;
+  private final FileChannel channel;
+  private final ReentrantLock channelLock = new ReentrantLock();
 
   public LocalFileSpanReceiver(HTraceConfiguration conf) {
-    this.executorTerminationTimeoutDuration = 
EXECUTOR_TERMINATION_TIMEOUT_DURATION_DEFAULT;
     int capacity = conf.getInt(CAPACITY_KEY, CAPACITY_DEFAULT);
-    this.file = conf.get(PATH_KEY);
-    if (file == null || file.isEmpty()) {
+    if (capacity < 1) {
+      throw new IllegalArgumentException(CAPACITY_KEY + " must not be " +
+          "less than 1.");
+    }
+    this.path = conf.get(PATH_KEY);
+    if (path == null || path.isEmpty()) {
       throw new IllegalArgumentException("must configure " + PATH_KEY);
     }
-    this.executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
-        new LinkedBlockingQueue<Runnable>(capacity));
     boolean success = false;
-    FileOutputStream fos = null;
     try {
-      fos = new FileOutputStream(file, true);
-      this.writer = new BufferedWriter(
-          new OutputStreamWriter(fos,"UTF-8"));
-      success = true;
+      this.stream = new FileOutputStream(path, true);
     } catch (IOException ioe) {
-      LOG.warn("Error opening " + file + ": " + ioe.getMessage());
+      LOG.error("Error opening " + path + ": " + ioe.getMessage());
       throw new RuntimeException(ioe);
-    } finally {
-      if (!success) {
-        if (fos != null) {
-          try {
-            fos.close();
-          } catch (IOException e) {
-            LOG.error("Error closing output stream for " + file, e);
-          }
-        }
+    }
+    this.channel = stream.getChannel();
+    if (this.channel == null) {
+      try {
+        this.stream.close();
+      } catch (IOException e) {
+        LOG.error("Error closing " + path, e);
       }
+      LOG.error("Failed to get channel for " + path);
+      throw new RuntimeException("Failed to get channel for " + path);
     }
+    this.bufferedSpans = new byte[capacity][];
+    this.bufferedSpansIndex = 0;
   }
 
-  private class WriteSpanRunnable implements Runnable {
-    public final Span span;
+  /**
+   * Number of buffers to use in FileChannel#write.
+   *
+   * On UNIX, FileChannel#write uses writev-- a kernel interface that allows
+   * us to send multiple buffers at once.  This is more efficient than making a
+   * separate write call for each buffer, since it minimizes the number of
+   * transitions from userspace to kernel space.
+   */
+  private final int WRITEV_SIZE = 20;
 
-    public WriteSpanRunnable(Span span) {
-      this.span = span;
-    }
+  /**
+   * Flushes a bufferedSpans array.
+   */
+  private void doFlush(byte[][] toFlush, int len) throws IOException {
+    int bidx = 0, widx = 0;
+    ByteBuffer writevBufs[] = new ByteBuffer[2 * WRITEV_SIZE];
+    ByteBuffer newlineBuf = ByteBuffer.wrap(new byte[] { (byte)0xa });
 
-    @Override
-    public void run() {
-      try {
-        JSON_WRITER.writeValue(writer, span);
-        writer.write("%n");
-      } catch (IOException e) {
-        LOG.error("Error when writing to file: " + file, e);
+    while (true) {
+      if (widx == writevBufs.length) {
+        channel.write(writevBufs);
+        widx = 0;
       }
+      if (bidx == len) {
+        break;
+      }
+      writevBufs[widx] = ByteBuffer.wrap(toFlush[bidx]);
+      writevBufs[widx + 1] = newlineBuf;
+      bidx++;
+      widx+=2;
+    }
+    if (widx > 0) {
+      channel.write(writevBufs, 0, widx);
     }
   }
 
   @Override
   public void receiveSpan(Span span) {
-    executor.submit(new WriteSpanRunnable(span));
+    // Serialize the span data into a byte[].  Note that we're not holding the
+    // lock here, to improve concurrency.
+    byte jsonBuf[] = null;
+    try {
+      jsonBuf = JSON_WRITER.writeValueAsBytes(span);
+    } catch (JsonProcessingException e) {
+        LOG.error("receiveSpan(path=" + path + ", span=" + span + "): " +
+                  "Json processing error: " + e.getMessage());
+      return;
+    }
+
+    // Grab the bufferLock and put our jsonBuf into the list of buffers to
+    // flush. 
+    byte toFlush[][] = null;
+    bufferLock.lock();
+    try {
+      if (bufferedSpans == null) {
+        LOG.debug("receiveSpan(path=" + path + ", span=" + span + "): " +
+                  "LocalFileSpanReceiver for " + path + " is closed.");
+        return;
+      }
+      bufferedSpans[bufferedSpansIndex] = jsonBuf;
+      bufferedSpansIndex++;
+      if (bufferedSpansIndex == bufferedSpans.length) {
+        // If we've hit the limit for the number of buffers to flush, 
+        // swap out the existing bufferedSpans array for a new array, and
+        // prepare to flush those spans to disk.
+        toFlush = bufferedSpans;
+        bufferedSpansIndex = 0;
+        bufferedSpans = new byte[bufferedSpans.length][];
+      }
+    } finally {
+      bufferLock.unlock();
+    }
+    if (toFlush != null) {
+      // We released the bufferLock above, to avoid blocking concurrent
+      // receiveSpan calls.  But now, we must take the channelLock, to make
+      // sure that we have sole access to the output channel.  If we did not do
+      // this, we might get interleaved output.
+      //
+      // There is a small chance that another thread doing a flush of more
+      // recent spans could get ahead of us here, and take the lock before we
+      // do.  This is ok, since spans don't have to be written out in order.
+      channelLock.lock();
+      try {
+        doFlush(toFlush, toFlush.length);
+      } catch (IOException ioe) {
+        LOG.error("Error flushing buffers to " + path + ": " +
+            ioe.getMessage());
+      } finally {
+        channelLock.unlock();
+      }
+    }
   }
 
   @Override
   public void close() throws IOException {
-    executor.shutdown();
+    byte toFlush[][] = null;
+    int numToFlush = 0;
+    bufferLock.lock();
     try {
-      if (!executor.awaitTermination(this.executorTerminationTimeoutDuration,
-          TimeUnit.SECONDS)) {
-        LOG.warn("Was not able to process all remaining spans to write upon 
closing in: "
-            + this.executorTerminationTimeoutDuration + "s");
+      if (bufferedSpans == null) {
+        LOG.info("LocalFileSpanReceiver for " + path + " was already closed.");
+        return;
       }
-    } catch (InterruptedException e1) {
-      LOG.warn("Thread interrupted when terminating executor.", e1);
+      numToFlush = bufferedSpansIndex;
+      bufferedSpansIndex = 0;
+      toFlush = bufferedSpans;
+      bufferedSpans = null;
+    } finally {
+      bufferLock.unlock();
     }
-
+    channelLock.lock();
     try {
-      writer.close();
-    } catch (IOException e) {
-      LOG.error("Error closing writer for file: " + file, e);
+      doFlush(toFlush, numToFlush);
+    } catch (IOException ioe) {
+      LOG.error("Error flushing buffers to " + path + ": " +
+          ioe.getMessage());
+    } finally {
+      try {
+        stream.close();
+      } catch (IOException e) {
+        LOG.error("Error closing stream for " + path, e);
+      }
+      channelLock.unlock();
     }
   }
 

Reply via email to