Author: ndimiduk
Date: Thu Apr 24 15:01:26 2014
New Revision: 1589762
URL: http://svn.apache.org/r1589762
Log:
HBASE-11004 Extend traces through FSHLog#sync
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
hbase/trunk/pom.xml
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1589762&r1=1589761&r2=1589762&view=diff
==============================================================================
---
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
(original)
+++
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
Thu Apr 24 15:01:26 2014
@@ -70,6 +70,8 @@ import org.apache.hadoop.hbase.util.FSUt
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
+import org.htrace.NullScope;
+import org.htrace.Span;
import org.htrace.Trace;
import org.htrace.TraceScope;
@@ -665,6 +667,7 @@ class FSHLog implements HLog, Syncable {
LOG.debug("HLog closing. Skipping rolling of writer");
return regionsToFlush;
}
+ TraceScope scope = Trace.startSpan("FSHLog.rollWriter");
try {
Path oldPath = getOldPath();
Path newPath = getNewPath();
@@ -688,6 +691,8 @@ class FSHLog implements HLog, Syncable {
}
} finally {
closeBarrier.endOp();
+ assert scope == NullScope.INSTANCE || !scope.isDetached();
+ scope.close();
}
return regionsToFlush;
} finally {
@@ -856,6 +861,7 @@ class FSHLog implements HLog, Syncable {
SyncFuture syncFuture = null;
SafePointZigZagLatch zigzagLatch = (this.ringBufferEventHandler == null)?
null: this.ringBufferEventHandler.attainSafePoint();
+ TraceScope scope = Trace.startSpan("FSHFile.replaceWriter");
try {
// Wait on the safe point to be achieved. Send in a sync in case
nothing has hit the
// ring buffer between the above notification of writer that we want it
to go to
@@ -863,7 +869,10 @@ class FSHLog implements HLog, Syncable {
// 'sendSync' instead of 'sync' because we do not want this thread to
block waiting on it
// to come back. Cleanup this syncFuture down below after we are ready
to run again.
try {
- if (zigzagLatch != null) syncFuture =
zigzagLatch.waitSafePoint(publishSyncOnRingBuffer());
+ if (zigzagLatch != null) {
+ Trace.addTimelineAnnotation("awaiting safepoint");
+ syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer());
+ }
} catch (FailedSyncBeforeLogCloseException e) {
if (isUnflushedEntries()) throw e;
// Else, let is pass through to the close.
@@ -874,7 +883,11 @@ class FSHLog implements HLog, Syncable {
// It is at the safe point. Swap out writer from under the blocked
writer thread.
// TODO: This is close is inline with critical section. Should happen
in background?
try {
- if (this.writer != null) this.writer.close();
+ if (this.writer != null) {
+ Trace.addTimelineAnnotation("closing writer");
+ this.writer.close();
+ Trace.addTimelineAnnotation("writer closed");
+ }
this.closeErrorCount.set(0);
} catch (IOException ioe) {
int errors = closeErrorCount.incrementAndGet();
@@ -915,6 +928,7 @@ class FSHLog implements HLog, Syncable {
// It will be null if we failed our wait on safe point above.
if (syncFuture != null) blockOnSync(syncFuture);
}
+ scope.close();
}
return newPath;
}
@@ -1139,8 +1153,7 @@ class FSHLog implements HLog, Syncable {
throws IOException {
if (!this.enabled || edits.isEmpty()) return this.highestUnsyncedSequence;
if (this.closed) throw new IOException("Cannot append; log is closed");
- // TODO: trace model here does not work any more. It does not match how
we append.
- TraceScope traceScope = Trace.startSpan("FSHlog.append");
+ TraceScope scope = Trace.startSpan("FSHLog.append");
// Make a key but do not set the WALEdit by region sequence id now -- set
it to -1 for now --
// and then later just before we write it out to the DFS stream, then set
the sequence id;
// late-binding.
@@ -1154,7 +1167,7 @@ class FSHLog implements HLog, Syncable {
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
FSWALEntry entry =
new FSWALEntry(sequence, logKey, edits, sequenceId, inMemstore, htd,
info);
- truck.loadPayload(entry, traceScope.detach());
+ truck.loadPayload(entry, scope.detach());
} finally {
this.disruptor.getRingBuffer().publish(sequence);
}
@@ -1164,7 +1177,7 @@ class FSHLog implements HLog, Syncable {
// When we sync, we will sync to the current point, the txid of the last
edit added.
// Since we are single writer, the next txid should be the just next one
in sequence;
// do not explicitly specify it. Sequence id/txid is an implementation
internal detail.
- if (doSync) publishSyncThenBlockOnCompletion();
+ if (doSync) sync();
return sequence;
}
@@ -1289,10 +1302,13 @@ class FSHLog implements HLog, Syncable {
}
// I got something. Lets run. Save off current sequence number in
case it changes
// while we run.
+ TraceScope scope = Trace.continueSpan(takeSyncFuture.getSpan());
long start = System.nanoTime();
Throwable t = null;
try {
+ Trace.addTimelineAnnotation("syncing writer");
writer.sync();
+ Trace.addTimelineAnnotation("writer synced");
currentSequence = updateHighestSyncedSequence(currentSequence);
} catch (IOException e) {
LOG.error("Error syncing, request close of hlog ", e);
@@ -1301,6 +1317,8 @@ class FSHLog implements HLog, Syncable {
LOG.warn("UNEXPECTED", e);
t = e;
} finally {
+ // reattach the span to the future before releasing.
+ takeSyncFuture.setSpan(scope.detach());
// First release what we 'took' from the queue.
syncCount += releaseSyncFuture(takeSyncFuture, currentSequence, t);
// Can we release other syncs?
@@ -1389,8 +1407,12 @@ class FSHLog implements HLog, Syncable {
}
private SyncFuture publishSyncOnRingBuffer() {
+ return publishSyncOnRingBuffer(null);
+ }
+
+ private SyncFuture publishSyncOnRingBuffer(Span span) {
long sequence = this.disruptor.getRingBuffer().next();
- SyncFuture syncFuture = getSyncFuture(sequence);
+ SyncFuture syncFuture = getSyncFuture(sequence, span);
try {
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
truck.loadPayload(syncFuture);
@@ -1401,14 +1423,15 @@ class FSHLog implements HLog, Syncable {
}
// Sync all known transactions
- private void publishSyncThenBlockOnCompletion() throws IOException {
- blockOnSync(publishSyncOnRingBuffer());
+ private Span publishSyncThenBlockOnCompletion(Span span) throws IOException {
+ return blockOnSync(publishSyncOnRingBuffer(span));
}
- private void blockOnSync(final SyncFuture syncFuture) throws IOException {
+ private Span blockOnSync(final SyncFuture syncFuture) throws IOException {
// Now we have published the ringbuffer, halt the current thread until we
get an answer back.
try {
syncFuture.get();
+ return syncFuture.getSpan();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
IOException ioe = new InterruptedIOException();
@@ -1419,13 +1442,13 @@ class FSHLog implements HLog, Syncable {
}
}
- private SyncFuture getSyncFuture(final long sequence) {
+ private SyncFuture getSyncFuture(final long sequence, Span span) {
SyncFuture syncFuture =
this.syncFuturesByHandler.get(Thread.currentThread());
if (syncFuture == null) {
syncFuture = new SyncFuture();
this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture);
}
- return syncFuture.reset(sequence);
+ return syncFuture.reset(sequence, span);
}
@Override
@@ -1472,17 +1495,35 @@ class FSHLog implements HLog, Syncable {
@Override
public void hsync() throws IOException {
- publishSyncThenBlockOnCompletion();
+ TraceScope scope = Trace.startSpan("FSHLog.hsync");
+ try {
+ scope =
Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
+ } finally {
+ assert scope == NullScope.INSTANCE || !scope.isDetached();
+ scope.close();
+ }
}
@Override
public void hflush() throws IOException {
- publishSyncThenBlockOnCompletion();
+ TraceScope scope = Trace.startSpan("FSHLog.hflush");
+ try {
+ scope =
Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
+ } finally {
+ assert scope == NullScope.INSTANCE || !scope.isDetached();
+ scope.close();
+ }
}
@Override
public void sync() throws IOException {
- publishSyncThenBlockOnCompletion();
+ TraceScope scope = Trace.startSpan("FSHLog.sync");
+ try {
+ scope =
Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
+ } finally {
+ assert scope == NullScope.INSTANCE || !scope.isDetached();
+ scope.close();
+ }
}
@Override
@@ -1491,7 +1532,13 @@ class FSHLog implements HLog, Syncable {
// Already sync'd.
return;
}
- publishSyncThenBlockOnCompletion();
+ TraceScope scope = Trace.startSpan("FSHLog.sync");
+ try {
+ scope =
Trace.continueSpan(publishSyncThenBlockOnCompletion(scope.detach()));
+ } finally {
+ assert scope == NullScope.INSTANCE || !scope.isDetached();
+ scope.close();
+ }
}
void requestLogRoll() {
@@ -1731,8 +1778,9 @@ class FSHLog implements HLog, Syncable {
* YMMV).
* <p>Herein, we have an array into which we store the sync futures as they
come in. When we
* have a 'batch', we'll then pass what we have collected to a SyncRunner
thread to do the
- * filesystem sync. When it completes, it will then call {@link
SyncFuture#done(long)} on each
- * of SyncFutures in the batch to release blocked Handler threads.
+ * filesystem sync. When it completes, it will then call
+ * {@link SyncFuture#done(long, Throwable)} on each of SyncFutures in the
batch to release
+ * blocked Handler threads.
* <p>I've tried various effects to try and make latencies low while keeping
throughput high.
* I've tried keeping a single Queue of SyncFutures in this class appending
to its tail as the
* syncs coming and having sync runner threads poll off the head to 'finish'
completed
@@ -1782,15 +1830,13 @@ class FSHLog implements HLog, Syncable {
// add appends to dfsclient as they come in. Batching appends doesn't
give any significant
// benefit on measurement. Handler sync calls we will batch up.
- // TODO: Trace only working for appends, not for syncs.
- TraceScope scope =
- truck.hasSpanPayload() ? Trace.continueSpan(truck.unloadSpanPayload())
: null;
try {
if (truck.hasSyncFuturePayload()) {
this.syncFutures[this.syncFuturesCount++] =
truck.unloadSyncFuturePayload();
// Force flush of syncs if we are carrying a full complement of
syncFutures.
if (this.syncFuturesCount == this.syncFutures.length) endOfBatch =
true;
} else if (truck.hasFSWALEntryPayload()) {
+ TraceScope scope = Trace.continueSpan(truck.unloadSpanPayload());
try {
append(truck.unloadFSWALEntryPayload());
} catch (Exception e) {
@@ -1799,6 +1845,9 @@ class FSHLog implements HLog, Syncable {
cleanupOutstandingSyncsOnException(sequence, e);
// Return to keep processing.
return;
+ } finally {
+ assert scope == NullScope.INSTANCE || !scope.isDetached();
+ scope.close(); // append scope is complete
}
} else {
// They can't both be null. Fail all up to this!!!
@@ -1828,10 +1877,6 @@ class FSHLog implements HLog, Syncable {
this.syncFuturesCount = 0;
} catch (Throwable t) {
LOG.error("UNEXPECTED!!!", t);
- } finally {
- // This scope only makes sense for the append. Syncs will be pulled-up
short so tracing
- // will not give a good representation. TODO: Fix.
- if (scope != null) scope.close();
}
}
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java?rev=1589762&r1=1589761&r2=1589762&view=diff
==============================================================================
---
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
(original)
+++
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java
Thu Apr 24 15:01:26 2014
@@ -21,6 +21,7 @@ import java.util.concurrent.ExecutionExc
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.htrace.Span;
/**
* A Future on a filesystem sync call. It given to a client or 'Handler' for
it to wait on till
@@ -41,8 +42,8 @@ import org.apache.hadoop.classification.
* SyncFutures as done; i.e. we batch up sync calls rather than do a dfs sync
* call every time a Handler asks for it.
* <p>
- * SyncFutures are immutable but recycled. Call {@link #reset(long)} before
use even if it
- * the first time, start the sync, then park the 'hitched' thread on a call to
+ * SyncFutures are immutable but recycled. Call {@link #reset(long, Span)}
before use even
+ * if it the first time, start the sync, then park the 'hitched' thread on a
call to
* {@link #get()}
*/
@InterfaceAudience.Private
@@ -58,7 +59,7 @@ class SyncFuture {
* The sequence that was set in here when we were marked done. Should be
equal
* or > ringBufferSequence. Put this data member into the NOT_DONE state
while this
* class is in use. But for the first position on construction, let it be
-1 so we can
- * immediately call {@link #reset(long)} below and it will work.
+ * immediately call {@link #reset(long, Span)} below and it will work.
*/
private long doneSequence = -1;
@@ -70,18 +71,37 @@ class SyncFuture {
private Thread t;
/**
+ * Optionally carry a disconnected scope to the SyncRunner.
+ */
+ private Span span;
+
+ /**
* Call this method to clear old usage and get it ready for new deploy. Call
* this method even if it is being used for the first time.
- *
- * @param sequence
+ *
+ * @param sequence sequenceId from this Future's position in the RingBuffer
* @return this
*/
synchronized SyncFuture reset(final long sequence) {
+ return reset(sequence, null);
+ }
+
+ /**
+ * Call this method to clear old usage and get it ready for new deploy. Call
+ * this method even if it is being used for the first time.
+ *
+ * @param sequence sequenceId from this Future's position in the RingBuffer
+ * @param span curren span, detached from caller. Don't forget to attach it
when
+ * resuming after a call to {@link #get()}.
+ * @return this
+ */
+ synchronized SyncFuture reset(final long sequence, Span span) {
if (t != null && t != Thread.currentThread()) throw new
IllegalStateException();
t = Thread.currentThread();
if (!isDone()) throw new IllegalStateException("" + sequence + " " +
Thread.currentThread());
this.doneSequence = NOT_DONE;
this.ringBufferSequence = sequence;
+ this.span = span;
return this;
}
@@ -95,6 +115,24 @@ class SyncFuture {
}
/**
+ * Retrieve the {@code span} instance from this Future. EventHandler calls
+ * this method to continue the span. Thread waiting on this Future musn't
call
+ * this method until AFTER calling {@link #get()} and the future has been
+ * released back to the originating thread.
+ */
+ synchronized Span getSpan() {
+ return this.span;
+ }
+
+ /**
+ * Used to re-attach a {@code span} to the Future. Called by the EventHandler
+ * after a it has completed processing and detached the span from its scope.
+ */
+ synchronized void setSpan(Span span) {
+ this.span = span;
+ }
+
+ /**
* @param sequence Sync sequence at which this future 'completed'.
* @param t Can be null. Set if we are 'completing' on error (and this 't'
is the error).
* @return True if we successfully marked this outstanding future as
completed/done.
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java?rev=1589762&r1=1589761&r2=1589762&view=diff
==============================================================================
---
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java
(original)
+++
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/trace/SpanReceiverHost.java
Thu Apr 24 15:01:26 2014
@@ -24,7 +24,6 @@ import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ReflectionUtils;
import org.htrace.SpanReceiver;
import org.htrace.Trace;
@@ -40,25 +39,25 @@ public class SpanReceiverHost {
private Configuration conf;
private boolean closed = false;
- private static enum SingleTonholder {
+ private static enum SingletonHolder {
INSTANCE;
Object lock = new Object();
SpanReceiverHost host = null;
}
public static SpanReceiverHost getInstance(Configuration conf) {
- if (SingleTonholder.INSTANCE.host != null) {
- return SingleTonholder.INSTANCE.host;
+ if (SingletonHolder.INSTANCE.host != null) {
+ return SingletonHolder.INSTANCE.host;
}
- synchronized (SingleTonholder.INSTANCE.lock) {
- if (SingleTonholder.INSTANCE.host != null) {
- return SingleTonholder.INSTANCE.host;
+ synchronized (SingletonHolder.INSTANCE.lock) {
+ if (SingletonHolder.INSTANCE.host != null) {
+ return SingletonHolder.INSTANCE.host;
}
SpanReceiverHost host = new SpanReceiverHost(conf);
host.loadSpanReceivers();
- SingleTonholder.INSTANCE.host = host;
- return SingleTonholder.INSTANCE.host;
+ SingletonHolder.INSTANCE.host = host;
+ return SingletonHolder.INSTANCE.host;
}
}
Modified:
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java?rev=1589762&r1=1589761&r2=1589762&view=diff
==============================================================================
---
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
(original)
+++
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
Thu Apr 24 15:01:26 2014
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.trace.SpanReceiverHost;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.util.Tool;
@@ -58,6 +59,10 @@ import com.yammer.metrics.core.Histogram
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.reporting.ConsoleReporter;
+import org.htrace.Sampler;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
+import org.htrace.impl.ProbabilitySampler;
/**
* This class runs performance benchmarks for {@link HLog}.
@@ -110,15 +115,34 @@ public final class HLogPerformanceEvalua
private final HRegion region;
private final int syncInterval;
private final HTableDescriptor htd;
+ private final Sampler loopSampler;
HLogPutBenchmark(final HRegion region, final HTableDescriptor htd,
- final long numIterations, final boolean noSync, final int
syncInterval) {
+ final long numIterations, final boolean noSync, final int syncInterval,
+ final double traceFreq) {
this.numIterations = numIterations;
this.noSync = noSync;
this.syncInterval = syncInterval;
this.numFamilies = htd.getColumnFamilies().length;
this.region = region;
this.htd = htd;
+ String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
+ if (spanReceivers == null || spanReceivers.isEmpty()) {
+ loopSampler = Sampler.NEVER;
+ } else {
+ if (traceFreq <= 0.0) {
+ LOG.warn("Tracing enabled but traceFreq=0.");
+ loopSampler = Sampler.NEVER;
+ } else if (traceFreq >= 1.0) {
+ loopSampler = Sampler.ALWAYS;
+ if (numIterations > 1000) {
+ LOG.warn("Full tracing of all iterations will produce a lot of
data. Be sure your"
+ + " SpanReciever can keep up.");
+ }
+ } else {
+ loopSampler = new ProbabilitySampler(traceFreq);
+ }
+ }
}
@Override
@@ -130,29 +154,39 @@ public final class HLogPerformanceEvalua
ArrayList<UUID> clusters = new ArrayList<UUID>();
long nonce = HConstants.NO_NONCE;
+ TraceScope threadScope =
+ Trace.startSpan("HLogPerfEval." + Thread.currentThread().getName());
try {
long startTime = System.currentTimeMillis();
int lastSync = 0;
for (int i = 0; i < numIterations; ++i) {
- long now = System.nanoTime();
- Put put = setupPut(rand, key, value, numFamilies);
- WALEdit walEdit = new WALEdit();
- addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit);
- HRegionInfo hri = region.getRegionInfo();
- hlog.appendNoSync(hri, hri.getTable(), walEdit, clusters, now, htd,
- region.getSequenceId(), true, nonce, nonce);
- if (!this.noSync) {
- if (++lastSync >= this.syncInterval) {
- hlog.sync();
- lastSync = 0;
+ assert Trace.currentSpan() == threadScope.getSpan() : "Span leak
detected.";
+ TraceScope loopScope = Trace.startSpan("runLoopIter" + i,
loopSampler);
+ try {
+ long now = System.nanoTime();
+ Put put = setupPut(rand, key, value, numFamilies);
+ WALEdit walEdit = new WALEdit();
+ addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit);
+ HRegionInfo hri = region.getRegionInfo();
+ hlog.appendNoSync(hri, hri.getTable(), walEdit, clusters, now, htd,
+ region.getSequenceId(), true, nonce, nonce);
+ if (!this.noSync) {
+ if (++lastSync >= this.syncInterval) {
+ hlog.sync();
+ lastSync = 0;
+ }
}
+ latencyHistogram.update(System.nanoTime() - now);
+ } finally {
+ loopScope.close();
}
- latencyHistogram.update(System.nanoTime() - now);
}
long totalTime = (System.currentTimeMillis() - startTime);
logBenchmarkResult(Thread.currentThread().getName(), numIterations,
totalTime);
} catch (Exception e) {
LOG.error(getClass().getSimpleName() + " Thread failed", e);
+ } finally {
+ threadScope.close();
}
}
}
@@ -172,6 +206,9 @@ public final class HLogPerformanceEvalua
long roll = Long.MAX_VALUE;
boolean compress = false;
String cipher = null;
+ String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
+ boolean trace = spanReceivers != null && !spanReceivers.isEmpty();
+ double traceFreq = 1.0;
// Process command line args
for (int i = 0; i < args.length; i++) {
String cmd = args[i];
@@ -208,6 +245,8 @@ public final class HLogPerformanceEvalua
compress = true;
} else if (cmd.equals("-encryption")) {
cipher = args[++i];
+ } else if (cmd.equals("-traceFreq")) {
+ traceFreq = Double.parseDouble(args[++i]);
} else if (cmd.equals("-h")) {
printUsageAndExit();
} else if (cmd.equals("--help")) {
@@ -248,6 +287,10 @@ public final class HLogPerformanceEvalua
FSUtils.setFsDefault(getConf(), FSUtils.getRootDir(getConf()));
FileSystem fs = FileSystem.get(getConf());
LOG.info("FileSystem: " + fs);
+
+ SpanReceiverHost receiverHost = trace ?
SpanReceiverHost.getInstance(getConf()) : null;
+ TraceScope scope = Trace.startSpan("HLogPerfEval", trace ? Sampler.ALWAYS
: Sampler.NEVER);
+
try {
if (rootRegionDir == null) {
rootRegionDir =
TEST_UTIL.getDataTestDirOnTestFS("HLogPerformanceEvaluation");
@@ -320,11 +363,13 @@ public final class HLogPerformanceEvalua
});
hlog.rollWriter();
HRegion region = null;
+
try {
region = openRegion(fs, rootRegionDir, htd, hlog);
ConsoleReporter.enable(this.metrics, 30, TimeUnit.SECONDS);
long putTime =
- runBenchmark(new HLogPutBenchmark(region, htd, numIterations,
noSync, syncInterval),
+ runBenchmark(Trace.wrap(
+ new HLogPutBenchmark(region, htd, numIterations, noSync,
syncInterval, traceFreq)),
numThreads);
logBenchmarkResult("Summary: threads=" + numThreads + ", iterations="
+ numIterations +
", syncInterval=" + syncInterval, numIterations * numThreads,
putTime);
@@ -356,6 +401,8 @@ public final class HLogPerformanceEvalua
} finally {
// We may be called inside a test that wants to keep on using the fs.
if (!noclosefs) fs.close();
+ scope.close();
+ if (receiverHost != null) receiverHost.closeReceivers();
}
return(0);
@@ -435,6 +482,8 @@ public final class HLogPerformanceEvalua
"e.g. all edit seq ids when verifying");
System.err.println(" -roll <N> Roll the way every N appends");
System.err.println(" -encryption <A> Encrypt the WAL with algorithm A,
e.g. AES");
+ System.err.println(" -traceFreq <N> Rate of trace sampling. Default:
1.0, " +
+ "only respected when tracing is enabled, ie
-Dhbase.trace.spanreceiver.classes=...");
System.err.println("");
System.err.println("Examples:");
System.err.println("");
Modified: hbase/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/hbase/trunk/pom.xml?rev=1589762&r1=1589761&r2=1589762&view=diff
==============================================================================
--- hbase/trunk/pom.xml (original)
+++ hbase/trunk/pom.xml Thu Apr 24 15:01:26 2014
@@ -919,7 +919,7 @@
<jersey.version>1.8</jersey.version>
<jruby.version>1.6.8</jruby.version>
<junit.version>4.11</junit.version>
- <htrace.version>3.0.3</htrace.version>
+ <htrace.version>3.0.4</htrace.version>
<log4j.version>1.2.17</log4j.version>
<mockito-all.version>1.9.0</mockito-all.version>
<protobuf.version>2.5.0</protobuf.version>