Author: ndimiduk
Date: Thu Apr 24 15:01:26 2014
New Revision: 1589762

URL: http://svn.apache.org/r1589762
Log:
HBASE-11004 Extend traces through FSHLog#sync

Modified:
    
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
    
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
    
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java
    
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
    hbase/trunk/pom.xml

Modified: 
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1589762&r1=1589761&r2=1589762&view=diff
==============================================================================
--- 
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
 (original)
+++ 
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
 Thu Apr 24 15:01:26 2014
@@ -70,6 +70,8 @@ import org.apache.hadoop.hbase.util.FSUt
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.util.StringUtils;
+import org.htrace.NullScope;
+import org.htrace.Span;
 import org.htrace.Trace;
 import org.htrace.TraceScope;
 
@@ -665,6 +667,7 @@ class FSHLog implements HLog, Syncable {
         LOG.debug("HLog closing. Skipping rolling of writer");
         return regionsToFlush;
       }
+      TraceScope scope = Trace.startSpan("FSHLog.rollWriter");
       try {
         Path oldPath = getOldPath();
         Path newPath = getNewPath();
@@ -688,6 +691,8 @@ class FSHLog implements HLog, Syncable {
         }
       } finally {
         closeBarrier.endOp();
+        assert scope == NullScope.INSTANCE || !scope.isDetached();
+        scope.close();
       }
       return regionsToFlush;
     } finally {
@@ -856,6 +861,7 @@ class FSHLog implements HLog, Syncable {
     SyncFuture syncFuture = null;
     SafePointZigZagLatch zigzagLatch = (this.ringBufferEventHandler == null)?
       null: this.ringBufferEventHandler.attainSafePoint();
+    TraceScope scope = Trace.startSpan("FSHFile.replaceWriter");
     try {
       // Wait on the safe point to be achieved.  Send in a sync in case 
nothing has hit the
       // ring buffer between the above notification of writer that we want it 
to go to
@@ -863,7 +869,10 @@ class FSHLog implements HLog, Syncable {
       // 'sendSync' instead of 'sync' because we do not want this thread to 
block waiting on it
       // to come back.  Cleanup this syncFuture down below after we are ready 
to run again.
       try {
-        if (zigzagLatch != null) syncFuture = 
zigzagLatch.waitSafePoint(publishSyncOnRingBuffer());
+        if (zigzagLatch != null) {
+          Trace.addTimelineAnnotation("awaiting safepoint");
+          syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer());
+        }
       } catch (FailedSyncBeforeLogCloseException e) {
         if (isUnflushedEntries()) throw e;
         // Else, let is pass through to the close.
@@ -874,7 +883,11 @@ class FSHLog implements HLog, Syncable {
       // It is at the safe point.  Swap out writer from under the blocked 
writer thread.
       // TODO: This is close is inline with critical section.  Should happen 
in background?
       try {
-        if (this.writer != null) this.writer.close();
+        if (this.writer != null) {
+          Trace.addTimelineAnnotation("closing writer");
+          this.writer.close();
+          Trace.addTimelineAnnotation("writer closed");
+        }
         this.closeErrorCount.set(0);
       } catch (IOException ioe) {
         int errors = closeErrorCount.incrementAndGet();
@@ -915,6 +928,7 @@ class FSHLog implements HLog, Syncable {
         // It will be null if we failed our wait on safe point above.
         if (syncFuture != null) blockOnSync(syncFuture);
       }
+      scope.close();
     }
     return newPath;
   }
@@ -1139,8 +1153,7 @@ class FSHLog implements HLog, Syncable {
   throws IOException {
     if (!this.enabled || edits.isEmpty()) return this.highestUnsyncedSequence;
     if (this.closed) throw new IOException("Cannot append; log is closed");
-    // TODO: trace model here does not work any more.  It does not match how 
we append.
-    TraceScope traceScope = Trace.startSpan("FSHlog.append");
+    TraceScope scope = Trace.startSpan("FSHLog.append");
     // Make a key but do not set the WALEdit by region sequence id now -- set 
it to -1 for now --
     // and then later just before we write it out to the DFS stream, then set 
the sequence id;
     // late-binding.
@@ -1154,7 +1167,7 @@ class FSHLog implements HLog, Syncable {
       RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
       FSWALEntry entry =
         new FSWALEntry(sequence, logKey, edits, sequenceId, inMemstore, htd, 
info);
-      truck.loadPayload(entry, traceScope.detach());
+      truck.loadPayload(entry, scope.detach());
     } finally {
       this.disruptor.getRingBuffer().publish(sequence);
     }
@@ -1164,7 +1177,7 @@ class FSHLog implements HLog, Syncable {
     // When we sync, we will sync to the current point, the txid of the last 
edit added.
     // Since we are single writer, the next txid should be the just next one 
in sequence;
     // do not explicitly specify it. Sequence id/txid is an implementation 
internal detail.
-    if (doSync) publishSyncThenBlockOnCompletion();
+    if (doSync) sync();
     return sequence;
   }
 
@@ -1289,10 +1302,13 @@ class FSHLog implements HLog, Syncable {
           }
           // I got something.  Lets run.  Save off current sequence number in 
case it changes
           // while we run.
+          TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan());
           long start = System.nanoTime();
           Throwable t = null;
           try {
+            Trace.addTimelineAnnotation("syncing writer");
             writer.sync();
+            Trace.addTimelineAnnotation("writer synced");
             currentSequence = updateHighestSyncedSequence(currentSequence);
           } catch (IOException e) {
             LOG.error("Error syncing, request close of hlog ", e);
@@ -1301,6 +1317,8 @@ class FSHLog implements HLog, Syncable {
             LOG.warn("UNEXPECTED", e);
             t = e;
           } finally {
+            // reattach the span to the future before releasing.
+            takeSyncFuture.setSpan(scope.detach());
             // First release what we 'took' from the queue.
             syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, t);
             // Can we release other syncs?
@@ -1389,8 +1407,12 @@ class FSHLog implements HLog, Syncable {
   }
 
   private SyncFuture publishSyncOnRingBuffer() {
+    return publishSyncOnRingBuffer(null);
+  }
+
+  private SyncFuture publishSyncOnRingBuffer(Span span) {
     long sequence = this.disruptor.getRingBuffer().next();
-    SyncFuture syncFuture = getSyncFuture(sequence);
+    SyncFuture syncFuture = getSyncFuture(sequence, span);
     try {
       RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
       truck.loadPayload(syncFuture);
@@ -1401,14 +1423,15 @@ class FSHLog implements HLog, Syncable {
   }
 
   // Sync all known transactions
-  private void publishSyncThenBlockOnCompletion() throws IOException {
-    blockOnSync(publishSyncOnRingBuffer());
+  private Span publishSyncThenBlockOnCompletion(Span span) throws IOException {
+    return blockOnSync(publishSyncOnRingBuffer(span));
   }
 
-  private void blockOnSync(final SyncFuture syncFuture) throws IOException {
+  private Span blockOnSync(final SyncFuture syncFuture) throws IOException {
     // Now we have published the ringbuffer, halt the current thread until we 
get an answer back.
     try {
       syncFuture.get();
+      return syncFuture.getSpan();
     } catch (InterruptedException ie) {
       Thread.currentThread().interrupt();
       IOException ioe = new InterruptedIOException();
@@ -1419,13 +1442,13 @@ class FSHLog implements HLog, Syncable {
     }
   }
 
-  private SyncFuture getSyncFuture(final long sequence) {
+  private SyncFuture getSyncFuture(final long sequence, Span span) {
     SyncFuture syncFuture = 
this.syncFuturesByHandler.get(Thread.currentThread());
     if (syncFuture == null) {
       syncFuture = new SyncFuture();
       this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture);
     }
-    return syncFuture.reset(sequence);
+    return syncFuture.reset(sequence, span);
   }
 
   @Override
@@ -1472,17 +1495,35 @@ class FSHLog implements HLog, Syncable {
 
   @Override
   public void hsync() throws IOException {
-    publishSyncThenBlockOnCompletion();
+    TraceScope scope = Trace.startSpan("FSHLog.hsync");
+    try {
+      scope = 
Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
+    } finally {
+      assert scope == NullScope.INSTANCE || !scope.isDetached();
+      scope.close();
+    }
   }
 
   @Override
   public void hflush() throws IOException {
-    publishSyncThenBlockOnCompletion();
+    TraceScope scope = Trace.startSpan("FSHLog.hflush");
+    try {
+      scope = 
Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
+    } finally {
+      assert scope == NullScope.INSTANCE || !scope.isDetached();
+      scope.close();
+    }
   }
 
   @Override
   public void sync() throws IOException {
-    publishSyncThenBlockOnCompletion();
+    TraceScope scope = Trace.startSpan("FSHLog.sync");
+    try {
+      scope = 
Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
+    } finally {
+      assert scope == NullScope.INSTANCE || !scope.isDetached();
+      scope.close();
+    }
   }
 
   @Override
@@ -1491,7 +1532,13 @@ class FSHLog implements HLog, Syncable {
       // Already sync'd.
       return;
     }
-    publishSyncThenBlockOnCompletion();
+    TraceScope scope = Trace.startSpan("FSHLog.sync");
+    try {
+      scope = 
Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
+    } finally {
+      assert scope == NullScope.INSTANCE || !scope.isDetached();
+      scope.close();
+    }
   }
 
   void requestLogRoll() {
@@ -1731,8 +1778,9 @@ class FSHLog implements HLog, Syncable {
    * YMMV).
    * <p>Herein, we have an array into which we store the sync futures as they 
come in.  When we
    * have a 'batch', we'll then pass what we have collected to a SyncRunner 
thread to do the
-   * filesystem sync.  When it completes, it will then call {@link 
SyncFuture#done(long)} on each
-   * of SyncFutures in the batch to release blocked Handler threads.
+   * filesystem sync.  When it completes, it will then call
+   * {@link SyncFuture#done(long, Throwable)} on each of SyncFutures in the 
batch to release
+   * blocked Handler threads.
    * <p>I've tried various effects to try and make latencies low while keeping 
throughput high.
    * I've tried keeping a single Queue of SyncFutures in this class appending 
to its tail as the
    * syncs coming and having sync runner threads poll off the head to 'finish' 
completed
@@ -1782,15 +1830,13 @@ class FSHLog implements HLog, Syncable {
       // add appends to dfsclient as they come in.  Batching appends doesn't 
give any significant
       // benefit on measurement.  Handler sync calls we will batch up.
 
-      // TODO: Trace only working for appends, not for syncs.
-      TraceScope scope =
-        truck.hasSpanPayload() ? Trace.continueSpan(truck.unloadSpanPayload()) 
: null;
       try {
         if (truck.hasSyncFuturePayload()) {
           this.syncFutures[this.syncFuturesCount++] = 
truck.unloadSyncFuturePayload();
           // Force flush of syncs if we are carrying a full complement of 
syncFutures.
           if (this.syncFuturesCount == this.syncFutures.length) endOfBatch = 
true;
         } else if (truck.hasFSWALEntryPayload()) {
+          TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload());
           try {
             append(truck.unloadFSWALEntryPayload());
           } catch (Exception e) {
@@ -1799,6 +1845,9 @@ class FSHLog implements HLog, Syncable {
             cleanupOutstandingSyncsOnException(sequence, e);
             // Return to keep processing.
             return;
+          } finally {
+            assert scope == NullScope.INSTANCE || !scope.isDetached();
+            scope.close(); // append scope is complete
           }
         } else {
           // They can't both be null.  Fail all up to this!!!
@@ -1828,10 +1877,6 @@ class FSHLog implements HLog, Syncable {
         this.syncFuturesCount = 0;
       } catch (Throwable t) {
         LOG.error("UNEXPECTED!!!", t);
-      } finally {
-        // This scope only makes sense for the append. Syncs will be pulled-up 
short so tracing
-        // will not give a good representation. TODO: Fix.
-        if (scope != null) scope.close();
       }
     }
 

Modified: 
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java?rev=1589762&r1=1589761&r2=1589762&view=diff
==============================================================================
--- 
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
 (original)
+++ 
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
 Thu Apr 24 15:01:26 2014
@@ -21,6 +21,7 @@ import java.util.concurrent.ExecutionExc
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.htrace.Span;
 
 /**
  * A Future on a filesystem sync call.  It given to a client or 'Handler' for 
it to wait on till
@@ -41,8 +42,8 @@ import org.apache.hadoop.classification.
  * SyncFutures as done; i.e. we batch up sync calls rather than do a dfs sync
  * call every time a Handler asks for it.
  * <p>
- * SyncFutures are immutable but recycled. Call {@link #reset(long)} before 
use even if it
- * the first time, start the sync, then park the 'hitched' thread on a call to
+ * SyncFutures are immutable but recycled. Call {@link #reset(long, Span)} 
before use even
+ * if it the first time, start the sync, then park the 'hitched' thread on a 
call to
  * {@link #get()}
  */
 @InterfaceAudience.Private
@@ -58,7 +59,7 @@ class SyncFuture {
    * The sequence that was set in here when we were marked done. Should be 
equal
    * or > ringBufferSequence.  Put this data member into the NOT_DONE state 
while this
    * class is in use.  But for the first position on construction, let it be 
-1 so we can
-   * immediately call {@link #reset(long)} below and it will work.
+   * immediately call {@link #reset(long, Span)} below and it will work.
    */
   private long doneSequence = -1;
 
@@ -70,18 +71,37 @@ class SyncFuture {
   private Thread t;
 
   /**
+   * Optionally carry a disconnected scope to the SyncRunner.
+   */
+  private Span span;
+
+  /**
    * Call this method to clear old usage and get it ready for new deploy. Call
    * this method even if it is being used for the first time.
-   * 
-   * @param sequence
+   *
+   * @param sequence sequenceId from this Future's position in the RingBuffer
    * @return this
    */
   synchronized SyncFuture reset(final long sequence) {
+    return reset(sequence, null);
+  }
+
+  /**
+   * Call this method to clear old usage and get it ready for new deploy. Call
+   * this method even if it is being used for the first time.
+   *
+   * @param sequence sequenceId from this Future's position in the RingBuffer
+   * @param span curren span, detached from caller. Don't forget to attach it 
when
+   *             resuming after a call to {@link #get()}.
+   * @return this
+   */
+  synchronized SyncFuture reset(final long sequence, Span span) {
     if (t != null && t != Thread.currentThread()) throw new 
IllegalStateException();
     t = Thread.currentThread();
     if (!isDone()) throw new IllegalStateException("" + sequence + " " + 
Thread.currentThread());
     this.doneSequence = NOT_DONE;
     this.ringBufferSequence = sequence;
+    this.span = span;
     return this;
   }
 
@@ -95,6 +115,24 @@ class SyncFuture {
   }
 
   /**
+   * Retrieve the {@code span} instance from this Future. EventHandler calls
+   * this method to continue the span. Thread waiting on this Future musn't 
call
+   * this method until AFTER calling {@link #get()} and the future has been
+   * released back to the originating thread.
+   */
+  synchronized Span getSpan() {
+    return this.span;
+  }
+
+  /**
+   * Used to re-attach a {@code span} to the Future. Called by the EventHandler
+   * after a it has completed processing and detached the span from its scope.
+   */
+  synchronized void setSpan(Span span) {
+    this.span = span;
+  }
+
+  /**
    * @param sequence Sync sequence at which this future 'completed'.
    * @param t Can be null.  Set if we are 'completing' on error (and this 't' 
is the error).
    * @return True if we successfully marked this outstanding future as 
completed/done.

Modified: 
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java?rev=1589762&r1=1589761&r2=1589762&view=diff
==============================================================================
--- 
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java
 (original)
+++ 
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java
 Thu Apr 24 15:01:26 2014
@@ -24,7 +24,6 @@ import java.util.HashSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.htrace.SpanReceiver;
 import org.htrace.Trace;
 
@@ -40,25 +39,25 @@ public class SpanReceiverHost {
   private Configuration conf;
   private boolean closed = false;
 
-  private static enum SingleTonholder {
+  private static enum SingletonHolder {
     INSTANCE;
     Object lock = new Object();
     SpanReceiverHost host = null;
   }
 
   public static SpanReceiverHost getInstance(Configuration conf) {
-    if (SingleTonholder.INSTANCE.host != null) {
-      return SingleTonholder.INSTANCE.host;
+    if (SingletonHolder.INSTANCE.host != null) {
+      return SingletonHolder.INSTANCE.host;
     }
-    synchronized (SingleTonholder.INSTANCE.lock) {
-      if (SingleTonholder.INSTANCE.host != null) {
-        return SingleTonholder.INSTANCE.host;
+    synchronized (SingletonHolder.INSTANCE.lock) {
+      if (SingletonHolder.INSTANCE.host != null) {
+        return SingletonHolder.INSTANCE.host;
       }
 
       SpanReceiverHost host = new SpanReceiverHost(conf);
       host.loadSpanReceivers();
-      SingleTonholder.INSTANCE.host = host;
-      return SingleTonholder.INSTANCE.host;
+      SingletonHolder.INSTANCE.host = host;
+      return SingletonHolder.INSTANCE.host;
     }
 
   }

Modified: 
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java?rev=1589762&r1=1589761&r2=1589762&view=diff
==============================================================================
--- 
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
 (original)
+++ 
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
 Thu Apr 24 15:01:26 2014
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.trace.SpanReceiverHost;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.util.Tool;
@@ -58,6 +59,10 @@ import com.yammer.metrics.core.Histogram
 import com.yammer.metrics.core.Meter;
 import com.yammer.metrics.core.MetricsRegistry;
 import com.yammer.metrics.reporting.ConsoleReporter;
+import org.htrace.Sampler;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
+import org.htrace.impl.ProbabilitySampler;
 
 /**
  * This class runs performance benchmarks for {@link HLog}.
@@ -110,15 +115,34 @@ public final class HLogPerformanceEvalua
     private final HRegion region;
     private final int syncInterval;
     private final HTableDescriptor htd;
+    private final Sampler loopSampler;
 
     HLogPutBenchmark(final HRegion region, final HTableDescriptor htd,
-        final long numIterations, final boolean noSync, final int 
syncInterval) {
+        final long numIterations, final boolean noSync, final int syncInterval,
+        final double traceFreq) {
       this.numIterations = numIterations;
       this.noSync = noSync;
       this.syncInterval = syncInterval;
       this.numFamilies = htd.getColumnFamilies().length;
       this.region = region;
       this.htd = htd;
+      String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
+      if (spanReceivers == null || spanReceivers.isEmpty()) {
+        loopSampler = Sampler.NEVER;
+      } else {
+        if (traceFreq <= 0.0) {
+          LOG.warn("Tracing enabled but traceFreq=0.");
+          loopSampler = Sampler.NEVER;
+        } else if (traceFreq >= 1.0) {
+          loopSampler = Sampler.ALWAYS;
+          if (numIterations > 1000) {
+            LOG.warn("Full tracing of all iterations will produce a lot of 
data. Be sure your"
+              + " SpanReciever can keep up.");
+          }
+        } else {
+          loopSampler = new ProbabilitySampler(traceFreq);
+        }
+      }
     }
 
     @Override
@@ -130,29 +154,39 @@ public final class HLogPerformanceEvalua
       ArrayList<UUID> clusters = new ArrayList<UUID>();
       long nonce = HConstants.NO_NONCE;
 
+      TraceScope threadScope =
+        Trace.startSpan("HLogPerfEval." + Thread.currentThread().getName());
       try {
         long startTime = System.currentTimeMillis();
         int lastSync = 0;
         for (int i = 0; i < numIterations; ++i) {
-          long now = System.nanoTime();
-          Put put = setupPut(rand, key, value, numFamilies);
-          WALEdit walEdit = new WALEdit();
-          addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit);
-          HRegionInfo hri = region.getRegionInfo();
-          hlog.appendNoSync(hri, hri.getTable(), walEdit, clusters, now, htd,
-            region.getSequenceId(), true, nonce, nonce);
-          if (!this.noSync) {
-            if (++lastSync >= this.syncInterval) {
-              hlog.sync();
-              lastSync = 0;
+          assert Trace.currentSpan() == threadScope.getSpan() : "Span leak 
detected.";
+          TraceScope loopScope = Trace.startSpan("runLoopIter" + i, 
loopSampler);
+          try {
+            long now = System.nanoTime();
+            Put put = setupPut(rand, key, value, numFamilies);
+            WALEdit walEdit = new WALEdit();
+            addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit);
+            HRegionInfo hri = region.getRegionInfo();
+            hlog.appendNoSync(hri, hri.getTable(), walEdit, clusters, now, htd,
+              region.getSequenceId(), true, nonce, nonce);
+            if (!this.noSync) {
+              if (++lastSync >= this.syncInterval) {
+                hlog.sync();
+                lastSync = 0;
+              }
             }
+            latencyHistogram.update(System.nanoTime() - now);
+          } finally {
+            loopScope.close();
           }
-          latencyHistogram.update(System.nanoTime() - now);
         }
         long totalTime = (System.currentTimeMillis() - startTime);
         logBenchmarkResult(Thread.currentThread().getName(), numIterations, 
totalTime);
       } catch (Exception e) {
         LOG.error(getClass().getSimpleName() + " Thread failed", e);
+      } finally {
+        threadScope.close();
       }
     }
   }
@@ -172,6 +206,9 @@ public final class HLogPerformanceEvalua
     long roll = Long.MAX_VALUE;
     boolean compress = false;
     String cipher = null;
+    String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
+    boolean trace = spanReceivers != null && !spanReceivers.isEmpty();
+    double traceFreq = 1.0;
     // Process command line args
     for (int i = 0; i < args.length; i++) {
       String cmd = args[i];
@@ -208,6 +245,8 @@ public final class HLogPerformanceEvalua
           compress = true;
         } else if (cmd.equals("-encryption")) {
           cipher = args[++i];
+        } else if (cmd.equals("-traceFreq")) {
+          traceFreq = Double.parseDouble(args[++i]);
         } else if (cmd.equals("-h")) {
           printUsageAndExit();
         } else if (cmd.equals("--help")) {
@@ -248,6 +287,10 @@ public final class HLogPerformanceEvalua
     FSUtils.setFsDefault(getConf(), FSUtils.getRootDir(getConf()));
     FileSystem fs = FileSystem.get(getConf());
     LOG.info("FileSystem: " + fs);
+
+    SpanReceiverHost receiverHost = trace ? 
SpanReceiverHost.getInstance(getConf()) : null;
+    TraceScope scope = Trace.startSpan("HLogPerfEval", trace ? Sampler.ALWAYS 
: Sampler.NEVER);
+
     try {
       if (rootRegionDir == null) {
         rootRegionDir = 
TEST_UTIL.getDataTestDirOnTestFS("HLogPerformanceEvaluation");
@@ -320,11 +363,13 @@ public final class HLogPerformanceEvalua
       });
       hlog.rollWriter();
       HRegion region = null;
+
       try {
         region = openRegion(fs, rootRegionDir, htd, hlog);
         ConsoleReporter.enable(this.metrics, 30, TimeUnit.SECONDS);
         long putTime =
-          runBenchmark(new HLogPutBenchmark(region, htd, numIterations, 
noSync, syncInterval),
+          runBenchmark(Trace.wrap(
+              new HLogPutBenchmark(region, htd, numIterations, noSync, 
syncInterval, traceFreq)),
             numThreads);
         logBenchmarkResult("Summary: threads=" + numThreads + ", iterations=" 
+ numIterations +
           ", syncInterval=" + syncInterval, numIterations * numThreads, 
putTime);
@@ -356,6 +401,8 @@ public final class HLogPerformanceEvalua
     } finally {
       // We may be called inside a test that wants to keep on using the fs.
       if (!noclosefs) fs.close();
+      scope.close();
+      if (receiverHost != null) receiverHost.closeReceivers();
     }
 
     return(0);
@@ -435,6 +482,8 @@ public final class HLogPerformanceEvalua
       "e.g. all edit seq ids when verifying");
     System.err.println("  -roll <N>        Roll the way every N appends");
     System.err.println("  -encryption <A>  Encrypt the WAL with algorithm A, 
e.g. AES");
+    System.err.println("  -traceFreq <N>   Rate of trace sampling. Default: 
1.0, " +
+      "only respected when tracing is enabled, ie 
-Dhbase.trace.spanreceiver.classes=...");
     System.err.println("");
     System.err.println("Examples:");
     System.err.println("");

Modified: hbase/trunk/pom.xml
URL: 
http://svn.apache.org/viewvc/hbase/trunk/pom.xml?rev=1589762&r1=1589761&r2=1589762&view=diff
==============================================================================
--- hbase/trunk/pom.xml (original)
+++ hbase/trunk/pom.xml Thu Apr 24 15:01:26 2014
@@ -919,7 +919,7 @@
     <jersey.version>1.8</jersey.version>
     <jruby.version>1.6.8</jruby.version>
     <junit.version>4.11</junit.version>
-    <htrace.version>3.0.3</htrace.version>
+    <htrace.version>3.0.4</htrace.version>
     <log4j.version>1.2.17</log4j.version>
     <mockito-all.version>1.9.0</mockito-all.version>
     <protobuf.version>2.5.0</protobuf.version>


Reply via email to