Repository: phoenix
Updated Branches:
  refs/heads/4.0 a88e24e1f -> 1332e6794


PHOENIX-1553 Fix failing PhoenixTracingEndToEndIT test


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1332e679
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1332e679
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1332e679

Branch: refs/heads/4.0
Commit: 1332e67941bb7ef2198dd16e7efdfb4dfbf6a12c
Parents: a88e24e
Author: James Taylor <jtay...@salesforce.com>
Authored: Fri Aug 8 16:19:58 2014 -0700
Committer: James Taylor <jtay...@salesforce.com>
Committed: Fri Aug 8 16:19:58 2014 -0700

----------------------------------------------------------------------
 .../coprocessor/BaseScannerRegionObserver.java  | 22 +++----
 .../phoenix/iterate/ParallelIterators.java      |  5 +-
 .../org/apache/phoenix/trace/util/Tracing.java  | 63 ++++++++++++++++++++
 3 files changed, 77 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/1332e679/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 379de36..730f2a1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -30,9 +30,9 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
+import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ServerUtil;
-import org.cloudera.htrace.Trace;
-import org.cloudera.htrace.TraceScope;
+import org.cloudera.htrace.Span;
 
 
 abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
@@ -103,21 +103,21 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
     public final RegionScanner postScannerOpen(
             final ObserverContext<RegionCoprocessorEnvironment> c, final Scan 
scan,
             final RegionScanner s) throws IOException {
-        try {
+       try {
             if (!isRegionObserverFor(scan)) {
                 return s;
             }
             throwIfScanOutOfRegion(scan, c.getEnvironment().getRegion());
-            boolean success = false;
+            boolean success =false;
             // turn on tracing, if its enabled
-            final TraceScope child = Trace.startSpan("Phoenix scanner openned 
on server");
+            final Span child = Tracing.childOnServer(scan, rawConf, 
SCANNER_OPENED_TRACE_INFO);
             try {
                 RegionScanner scanner = doPostScannerOpen(c, scan, s);
                 scanner = new DelegateRegionScanner(scanner) {
                     @Override
                     public void close() throws IOException {
-                        if (child.getSpan() != null) {
-                            child.getSpan().stop();
+                        if (child != null) {
+                            child.stop();
                         }
                         delegate.close();
                     }
@@ -125,13 +125,13 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
                 success = true;
                 return scanner;
             } finally {
-                if (!success && child.getSpan() != null) {
-                    child.getSpan().stop();
+                if (!success && child != null) {
+                    child.stop();
                 }
-            } 
+            }
         } catch (Throwable t) {
             
ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(),
 t);
             return null; // impossible
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1332e679/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 1f2083f..7600e47 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -61,6 +61,7 @@ import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ScanUtil;
@@ -346,7 +347,7 @@ public class ParallelIterators extends ExplainTable 
implements ResultIterators {
                 // Delay the swapping of start/stop row until row so we don't 
muck with the intersect logic
                 ScanUtil.swapStartStopRowIfReversed(splitScan);
                 Future<PeekingResultIterator> future =
-                    executor.submit(new JobCallable<PeekingResultIterator>() {
+                    executor.submit(Tracing.wrap(new 
JobCallable<PeekingResultIterator>() {
 
                     @Override
                     public PeekingResultIterator call() throws Exception {
@@ -368,7 +369,7 @@ public class ParallelIterators extends ExplainTable 
implements ResultIterators {
                     public Object getJobId() {
                         return ParallelIterators.this;
                     }
-                });
+                }, "Parallel scanner for table: " + 
tableRef.getTable().getName().getString()));
                 futures.add(new 
Pair<KeyRange,Future<PeekingResultIterator>>(split,future));
             }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1332e679/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java 
b/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java
index 9b0079f..5913cfb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java
@@ -20,13 +20,19 @@ package org.apache.phoenix.trace.util;
 import java.util.Properties;
 import java.util.concurrent.Callable;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.call.CallRunner;
 import org.apache.phoenix.call.CallWrapper;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.cloudera.htrace.Sampler;
 import org.cloudera.htrace.Span;
 import org.cloudera.htrace.Trace;
+import org.cloudera.htrace.TraceInfo;
 import org.cloudera.htrace.TraceScope;
 import org.cloudera.htrace.Tracer;
 import org.cloudera.htrace.impl.ProbabilitySampler;
@@ -40,10 +46,13 @@ import com.google.common.base.Function;
  */
 public class Tracing {
 
+    private static final Log LOG = LogFactory.getLog(Tracing.class);
+
     private static final String SEPARATOR = ".";
     // Constants for tracing across the wire
     public static final String TRACE_ID_ATTRIBUTE_KEY = 
"phoenix.trace.traceid";
     public static final String SPAN_ID_ATTRIBUTE_KEY = "phoenix.trace.spanid";
+    private static final String START_SPAN_MESSAGE = "Span received on server. 
Starting child";
 
     // Constants for passing into the metrics system
     public static final String TRACE_METRIC_PREFIX = "phoenix.trace.instance";
@@ -153,6 +162,60 @@ public class Tracing {
                 + SEPARATOR + span.getSpanId();
     }
 
+    /**
+     * Check to see if tracing is current enabled. The trace for this thread 
is returned, if we are
+     * already tracing. Otherwise, checks to see if mutation has tracing 
enabled, and if so, starts
+     * a new span with the {@link Mutation}'s specified span as its parent.
+     * <p>
+     * This should only be run on the server-side as we base tracing on if we 
are currently tracing
+     * (started higher in the call-stack) or if the {@link Mutation} has the 
tracing attributes
+     * defined. As such, we would expect to continue the trace on the 
server-side based on the
+     * original sampling parameters.
+     * @param scan {@link Mutation} to check
+     * @param conf {@link Configuration} to read for the current sampler
+     * @param description description of the child span to start
+     * @return <tt>null</tt> if tracing is not enabled, or the parent {@link 
Span}
+     */
+    public static Span childOnServer(OperationWithAttributes scan, 
Configuration conf,
+            String description) {
+        // check to see if we are currently tracing. Generally, this will only 
work when we go to
+        // 0.96. CPs should always be setting up and tearing down their own 
tracing
+        Span current = Trace.currentSpan();
+        if (current == null) {
+            // its not tracing yet, but maybe it should be.
+            current = enable(scan, conf, description);
+        } else {
+            current = Trace.startSpan(description, current).getSpan();
+        }
+        return current;
+    }
+
+    /**
+     * Check to see if this mutation has tracing enabled, and if so, get a new 
span with the
+     * {@link Mutation}'s specified span as its parent.
+     * @param map mutation to check
+     * @param conf {@link Configuration} to check for the {@link Sampler} 
configuration, if we are
+     *            tracing
+     * @param description on the child to start
+     * @return a child span of the mutation, or <tt>null</tt> if tracing is 
not enabled.
+     */
+    @SuppressWarnings("unchecked")
+    private static Span enable(OperationWithAttributes map, Configuration 
conf, String description) {
+        byte[] traceid = map.getAttribute(TRACE_ID_ATTRIBUTE_KEY);
+        if (traceid == null) {
+            return NullSpan.INSTANCE;
+        }
+        byte[] spanid = map.getAttribute(SPAN_ID_ATTRIBUTE_KEY);
+        if (spanid == null) {
+            LOG.error("TraceID set to " + Bytes.toLong(traceid) + ", but span 
id was not set!");
+            return NullSpan.INSTANCE;
+        }
+        Sampler<?> sampler = SERVER_TRACE_LEVEL;
+        TraceInfo parent = new TraceInfo(Bytes.toLong(traceid), 
Bytes.toLong(spanid));
+        return Trace.startSpan(START_SPAN_MESSAGE + ": " + description,
+            (Sampler<TraceInfo>) sampler, parent).getSpan();
+    }
+
     public static Span child(Span s, String d) {
         if (s == null) {
             return NullSpan.INSTANCE;

Reply via email to