PHOENIX-177: Collect usage and performance metrics

Add basic Dapper-like tracing (using Cloudera's HTrace library)
to phoenix requests. This is the basic infrastructure to
support more holistic, non-profiler based analysis.

This patch includes, among other things, the
infrastructure to use HTrace, async-tracing handling
via the Hadoop metrics2 framework, and trace read/write
to a phoenix table.

Currently, do NOT support Hadoop1 (though does work
against Hadoop1).

Default builds to hadoop2, rather than hadoop1 (particularly as hadoop1 is
now a second-class citizen).


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b7f46c10
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b7f46c10
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b7f46c10

Branch: refs/heads/master
Commit: b7f46c1051de3e23630dccb82677a0a16985f27c
Parents: 9185f76
Author: Jesse Yates <jya...@apache.org>
Authored: Fri Jun 6 16:11:32 2014 -0700
Committer: Jesse Yates <jya...@apache.org>
Committed: Mon Jul 28 06:37:49 2014 -0700

----------------------------------------------------------------------
 phoenix-core/pom.xml                            |  47 +--
 .../apache/phoenix/trace/BaseTracingTestIT.java | 117 ++++++
 .../phoenix/trace/DelegatingConnection.java     | 328 +++++++++++++++
 .../phoenix/trace/DisableableMetricsWriter.java |  83 ++++
 .../trace/Hadoop1TracingTestEnabler.java        |  86 ++++
 .../apache/phoenix/trace/PhoenixMetricImpl.java |  44 ++
 .../phoenix/trace/PhoenixMetricRecordImpl.java  |  71 ++++
 .../trace/PhoenixTableMetricsWriterIT.java      | 119 ++++++
 .../apache/phoenix/trace/PhoenixTagImpl.java    |  52 +++
 .../phoenix/trace/PhoenixTracingEndToEndIT.java | 401 +++++++++++++++++++
 .../apache/phoenix/trace/TraceReaderTest.java   | 181 +++++++++
 .../org/apache/phoenix/call/CallRunner.java     |  66 +++
 .../org/apache/phoenix/call/CallWrapper.java    |  29 ++
 .../coprocessor/BaseScannerRegionObserver.java  |  36 +-
 .../coprocessor/DelegateRegionScanner.java      |  78 ++++
 .../apache/phoenix/execute/BasicQueryPlan.java  |  17 +-
 .../apache/phoenix/execute/MutationState.java   |  24 +-
 .../org/apache/phoenix/hbase/index/Indexer.java |  39 ++
 .../phoenix/iterate/ParallelIterators.java      |   5 +-
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  38 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  25 +-
 .../trace/PhoenixTableMetricsWriter.java        | 255 ++++++++++++
 .../org/apache/phoenix/trace/TraceReader.java   | 375 +++++++++++++++++
 .../apache/phoenix/trace/TracingIterator.java   |  58 +++
 .../trace/util/ConfigurationAdapter.java        |  56 +++
 .../org/apache/phoenix/trace/util/NullSpan.java | 112 ++++++
 .../org/apache/phoenix/trace/util/Tracing.java  | 282 +++++++++++++
 .../phoenix/util/PhoenixContextExecutor.java    |  23 ++
 .../java/org/apache/phoenix/util/QueryUtil.java |  29 ++
 .../test/resources/hadoop-metrics2.properties   |  25 ++
 .../src/test/resources/log4j.properties         |   4 +-
 phoenix-hadoop-compat/pom.xml                   |  31 +-
 .../org/apache/phoenix/metrics/MetricInfo.java  |  51 +++
 .../org/apache/phoenix/metrics/Metrics.java     |  39 ++
 .../apache/phoenix/metrics/MetricsManager.java  |  58 +++
 .../apache/phoenix/metrics/MetricsWriter.java   |  31 ++
 .../phoenix/metrics/PhoenixAbstractMetric.java  |  30 ++
 .../phoenix/metrics/PhoenixMetricTag.java       |  27 ++
 .../phoenix/metrics/PhoenixMetricsRecord.java   |  35 ++
 .../phoenix/trace/PhoenixSpanReceiver.java      |  26 ++
 .../phoenix/trace/TestableMetricsWriter.java    |  30 ++
 .../org/apache/phoenix/trace/TracingCompat.java |  94 +++++
 .../org/apache/phoenix/metrics/LoggingSink.java |  56 +++
 .../phoenix/metrics/TracingTestCompat.java      |  45 +++
 phoenix-hadoop2-compat/pom.xml                  |  47 ++-
 .../phoenix/metrics/MetricsManagerImpl.java     |  71 ++++
 .../apache/phoenix/trace/MetricsInfoImpl.java   |  63 +++
 .../phoenix/trace/PhoenixMetricsWriter.java     | 176 ++++++++
 .../apache/phoenix/trace/TraceMetricSource.java | 192 +++++++++
 .../org.apache.phoenix.metrics.MetricsManager   |   1 +
 ...org.apache.phoenix.trace.PhoenixSpanReceiver |   1 +
 ...g.apache.phoenix.trace.TestableMetricsWriter |   1 +
 .../metrics2/impl/ExposedMetricCounterLong.java |  35 ++
 .../metrics2/impl/ExposedMetricsRecordImpl.java |  43 ++
 .../metrics2/lib/ExposedMetricsInfoImpl.java    |  32 ++
 .../phoenix/trace/PhoenixMetricsWriterTest.java | 142 +++++++
 .../org/apache/phoenix/trace/TracingTest.java   |  34 ++
 pom.xml                                         |  40 +-
 58 files changed, 4479 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index f5c3ace..cfdee95 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -299,25 +299,23 @@
     <dependency>
       <groupId>commons-collections</groupId>
       <artifactId>commons-collections</artifactId>
-      <version>${collections.version}</version>
-    </dependency>    
+    </dependency>
   </dependencies>
 
   <profiles>
-    <!-- Profile for building against Hadoop 1. Active by default. Not used if 
another 
-      Hadoop profile is specified with mvn -Dhadoop.profile=foo -->
+    <!-- Profile for building against Hadoop 1. Activate using: mvn 
-Dhadoop.profile=1-->
     <profile>
       <id>hadoop-1</id>
       <activation>
         <property>
-          <name>!hadoop.profile</name>
+          <name>hadoop.profile</name>
+          <value>1</value>
         </property>
       </activation>
       <dependencies>
           <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-testing-util</artifactId>
-            <version>${hbase-hadoop1.version}</version>
             <exclusions>
               <exclusion>
                 <groupId>org.jruby</groupId>
@@ -341,22 +339,18 @@
           <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-common</artifactId>
-            <version>${hbase-hadoop1.version}</version>
           </dependency>
           <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-protocol</artifactId>
-            <version>${hbase-hadoop1.version}</version>
           </dependency>
           <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-client</artifactId>
-            <version>${hbase-hadoop1.version}</version>
           </dependency>
           <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-core</artifactId>
-            <version>${hadoop-one.version}</version>
             <exclusions>
               <exclusion>
                 <groupId>hsqldb</groupId>
@@ -383,27 +377,29 @@
           <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-test</artifactId>
-            <version>${hadoop-one.version}</version>
             <optional>true</optional>
             <scope>test</scope>
           </dependency>
+         <dependency>
+          <groupId>org.apache.phoenix</groupId>
+          <artifactId>phoenix-hadoop1-compat</artifactId>
+        </dependency>
       </dependencies>
     </profile>
 
-    <!-- Profile for building against Hadoop 2. Activate using: mvn 
-Dhadoop.profile=2 -->
+    <!-- Profile for building against Hadoop 2.  Active by default. Not used 
if another
+      Hadoop profile is specified with mvn -Dhadoop.profile=foo -->
     <profile>
       <id>hadoop-2</id>
       <activation>
         <property>
-          <name>hadoop.profile</name>
-          <value>2</value>
+          <name>!hadoop.profile</name>
         </property>
       </activation>
       <dependencies>
         <dependency>
           <groupId>org.apache.hbase</groupId>
           <artifactId>hbase-testing-util</artifactId>
-          <version>${hbase-hadoop2.version}</version>
         </dependency>
         <dependency>
           <groupId>org.apache.hbase</groupId>
@@ -415,23 +411,14 @@
         <dependency>
           <groupId>org.apache.hbase</groupId>
           <artifactId>hbase-common</artifactId>
-          <version>${hbase-hadoop2.version}</version>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.hbase</groupId>
-          <artifactId>hbase-common</artifactId>
-          <version>${hbase-hadoop2.version}</version>
-          <type>test-jar</type>
         </dependency>
         <dependency>
           <groupId>org.apache.hbase</groupId>
           <artifactId>hbase-protocol</artifactId>
-          <version>${hbase-hadoop2.version}</version>
         </dependency>
         <dependency>
           <groupId>org.apache.hbase</groupId>
           <artifactId>hbase-client</artifactId>
-          <version>${hbase-hadoop2.version}</version>
         </dependency>
         <dependency>
           <groupId>org.apache.hbase</groupId>
@@ -447,13 +434,11 @@
         <dependency>
           <groupId>org.apache.hbase</groupId>
           <artifactId>hbase-hadoop-compat</artifactId>
-          <version>${hbase-hadoop2.version}</version>
           <scope>test</scope>
         </dependency>
         <dependency>
           <groupId>org.apache.hbase</groupId>
           <artifactId>hbase-hadoop-compat</artifactId>
-          <version>${hbase-hadoop2.version}</version>
           <type>test-jar</type>
           <scope>test</scope>
         </dependency>
@@ -486,6 +471,16 @@
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-minicluster</artifactId>
         </dependency>
+        <dependency>
+          <groupId>org.apache.phoenix</groupId>
+          <artifactId>phoenix-hadoop2-compat</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.phoenix</groupId>
+          <artifactId>phoenix-hadoop2-compat</artifactId>
+          <classifier>tests</classifier>
+          <scope>test</scope>
+        </dependency>
       </dependencies>
       <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java
new file mode 100644
index 0000000..59cd871
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.trace;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.metrics.MetricInfo;
+import org.apache.phoenix.metrics.Metrics;
+import org.apache.phoenix.metrics.PhoenixAbstractMetric;
+import org.apache.phoenix.metrics.PhoenixMetricTag;
+import org.apache.phoenix.metrics.PhoenixMetricsRecord;
+import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.trace.util.Tracing.Frequency;
+
+/**
+ * Base test for tracing tests - helps manage getting tracing/non-tracing
+ * connections, as well as any supporting utils.
+ */
+public class BaseTracingTestIT extends BaseHBaseManagedTimeIT {
+    private static final Log LOG = LogFactory.getLog(BaseTracingTestIT.class);
+
+    /**
+     * Hadoop1 doesn't yet support tracing (need metrics library support) so 
we just skip those
+     * tests for the moment
+     * @return <tt>true</tt> if the test should exit because some necessary 
classes are missing, or
+     *         <tt>false</tt> if the tests can continue normally
+     */
+    static boolean shouldEarlyExitForHadoop1Test() {
+        try {
+            // get a receiver for the spans
+            TracingCompat.newTraceMetricSource();
+            // which also needs to a source for the metrics system
+            Metrics.getManager();
+            return false;
+        } catch (RuntimeException e) {
+            LOG.error("Shouldn't run test because can't instantiate necessary 
metrics/tracing classes!");
+        }
+
+        return true;
+    }
+
+    public static Connection getConnectionWithoutTracing() throws SQLException 
{
+        Properties props = new Properties(TEST_PROPERTIES);
+        return getConnectionWithoutTracing(props);
+    }
+
+    public static Connection getConnectionWithoutTracing(Properties props) 
throws SQLException {
+        Connection conn = getConnectionWithTracingFrequency(props, 
Frequency.NEVER);
+        conn.setAutoCommit(false);
+        return conn;
+    }
+
+    public static Connection getTracingConnection() throws Exception {
+        Properties props = new Properties(TEST_PROPERTIES);
+        return getConnectionWithTracingFrequency(props, 
Tracing.Frequency.ALWAYS);
+    }
+
+    public static Connection getConnectionWithTracingFrequency(Properties 
props,
+            Tracing.Frequency frequency) throws SQLException {
+        Tracing.setSampling(props, frequency);
+        return DriverManager.getConnection(getUrl(), props);
+    }
+
+    public static PhoenixMetricsRecord createRecord(long traceid, long 
parentid, long spanid,
+            String desc, long startTime, long endTime, String hostname, 
String... tags) {
+        PhoenixMetricRecordImpl record =
+                new 
PhoenixMetricRecordImpl(TracingCompat.getTraceMetricName(traceid), desc);
+        PhoenixAbstractMetric span = new 
PhoenixMetricImpl(MetricInfo.SPAN.traceName, spanid);
+        record.addMetric(span);
+
+        PhoenixAbstractMetric parent = new 
PhoenixMetricImpl(MetricInfo.PARENT.traceName, parentid);
+        record.addMetric(parent);
+
+        PhoenixAbstractMetric start = new 
PhoenixMetricImpl(MetricInfo.START.traceName, startTime);
+        record.addMetric(start);
+
+        PhoenixAbstractMetric end = new 
PhoenixMetricImpl(MetricInfo.END.traceName, endTime);
+        record.addMetric(end);
+
+        int tagCount = 0;
+        for (String annotation : tags) {
+            PhoenixMetricTag tag =
+                    new PhoenixTagImpl(MetricInfo.ANNOTATION.traceName,
+                            Integer.toString(tagCount++), annotation);
+            record.addTag(tag);
+        }
+        String hostnameValue = "host-name.value";
+        PhoenixMetricTag hostnameTag =
+                new PhoenixTagImpl(MetricInfo.HOSTNAME.traceName, "", 
hostnameValue);
+        record.addTag(hostnameTag);
+
+        return record;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegatingConnection.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegatingConnection.java 
b/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegatingConnection.java
new file mode 100644
index 0000000..261522d
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegatingConnection.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.trace;
+
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+
+import org.apache.phoenix.jdbc.Jdbc7Shim;
+
+/**
+ * Simple {@link Connection} that just delegates to an underlying {@link 
Connection}.
+ * @param <D> delegate type that is both a {@link Connection} and a {@link 
Jdbc7Shim#Connection}
+ */
+public class DelegatingConnection<D extends Connection & Jdbc7Shim.Connection> 
implements
+        Connection, Jdbc7Shim.Connection {
+
+  @Override
+  public <T> T unwrap(Class<T> iface) throws SQLException {
+    return conn.unwrap(iface);
+  }
+
+  @Override
+  public boolean isWrapperFor(Class<?> iface) throws SQLException {
+    return conn.isWrapperFor(iface);
+  }
+
+  @Override
+  public Statement createStatement() throws SQLException {
+    return conn.createStatement();
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql) throws SQLException {
+    return conn.prepareStatement(sql);
+  }
+
+  @Override
+  public CallableStatement prepareCall(String sql) throws SQLException {
+    return conn.prepareCall(sql);
+  }
+
+  @Override
+  public String nativeSQL(String sql) throws SQLException {
+    return conn.nativeSQL(sql);
+  }
+
+  @Override
+  public void setAutoCommit(boolean autoCommit) throws SQLException {
+    conn.setAutoCommit(autoCommit);
+  }
+
+  @Override
+  public boolean getAutoCommit() throws SQLException {
+    return conn.getAutoCommit();
+  }
+
+  @Override
+  public void commit() throws SQLException {
+    conn.commit();
+  }
+
+  @Override
+  public void rollback() throws SQLException {
+    conn.rollback();
+  }
+
+  @Override
+  public void close() throws SQLException {
+    conn.close();
+  }
+
+  @Override
+  public boolean isClosed() throws SQLException {
+    return conn.isClosed();
+  }
+
+  @Override
+  public DatabaseMetaData getMetaData() throws SQLException {
+    return conn.getMetaData();
+  }
+
+  @Override
+  public void setReadOnly(boolean readOnly) throws SQLException {
+    conn.setReadOnly(readOnly);
+  }
+
+  @Override
+  public boolean isReadOnly() throws SQLException {
+    return conn.isReadOnly();
+  }
+
+  @Override
+  public void setCatalog(String catalog) throws SQLException {
+    conn.setCatalog(catalog);
+  }
+
+  @Override
+  public String getCatalog() throws SQLException {
+    return conn.getCatalog();
+  }
+
+  @Override
+  public void setTransactionIsolation(int level) throws SQLException {
+    conn.setTransactionIsolation(level);
+  }
+
+  @Override
+  public int getTransactionIsolation() throws SQLException {
+    return conn.getTransactionIsolation();
+  }
+
+  @Override
+  public SQLWarning getWarnings() throws SQLException {
+    return conn.getWarnings();
+  }
+
+  @Override
+  public void clearWarnings() throws SQLException {
+    conn.clearWarnings();
+  }
+
+  @Override
+  public Statement createStatement(int resultSetType, int 
resultSetConcurrency) throws SQLException {
+    return conn.createStatement(resultSetType, resultSetConcurrency);
+  }
+
+  @Override
+  public PreparedStatement
+      prepareStatement(String sql, int resultSetType, int 
resultSetConcurrency) throws SQLException {
+    return conn.prepareStatement(sql, resultSetType, resultSetConcurrency);
+  }
+
+  @Override
+  public CallableStatement prepareCall(String sql, int resultSetType, int 
resultSetConcurrency)
+      throws SQLException {
+    return conn.prepareCall(sql, resultSetType, resultSetConcurrency);
+  }
+
+  @Override
+  public Map<String, Class<?>> getTypeMap() throws SQLException {
+    return conn.getTypeMap();
+  }
+
+  @Override
+  public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+    conn.setTypeMap(map);
+  }
+
+  @Override
+  public void setHoldability(int holdability) throws SQLException {
+    conn.setHoldability(holdability);
+  }
+
+  @Override
+  public int getHoldability() throws SQLException {
+    return conn.getHoldability();
+  }
+
+  @Override
+  public Savepoint setSavepoint() throws SQLException {
+    return conn.setSavepoint();
+  }
+
+  @Override
+  public Savepoint setSavepoint(String name) throws SQLException {
+    return conn.setSavepoint(name);
+  }
+
+  @Override
+  public void rollback(Savepoint savepoint) throws SQLException {
+    conn.rollback(savepoint);
+  }
+
+  @Override
+  public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+    conn.releaseSavepoint(savepoint);
+  }
+
+  @Override
+  public Statement createStatement(int resultSetType, int resultSetConcurrency,
+      int resultSetHoldability) throws SQLException {
+    return conn.createStatement(resultSetType, resultSetConcurrency, 
resultSetHoldability);
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int resultSetType,
+      int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+    return conn.prepareStatement(sql, resultSetType, resultSetConcurrency, 
resultSetHoldability);
+  }
+
+  @Override
+  public CallableStatement prepareCall(String sql, int resultSetType, int 
resultSetConcurrency,
+      int resultSetHoldability) throws SQLException {
+    return conn.prepareCall(sql, resultSetType, resultSetConcurrency, 
resultSetHoldability);
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) 
throws SQLException {
+    return conn.prepareStatement(sql, autoGeneratedKeys);
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int[] columnIndexes) 
throws SQLException {
+    return conn.prepareStatement(sql, columnIndexes);
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, String[] columnNames) 
throws SQLException {
+    return conn.prepareStatement(sql, columnNames);
+  }
+
+  @Override
+  public Clob createClob() throws SQLException {
+    return conn.createClob();
+  }
+
+  @Override
+  public Blob createBlob() throws SQLException {
+    return conn.createBlob();
+  }
+
+  @Override
+  public NClob createNClob() throws SQLException {
+    return conn.createNClob();
+  }
+
+  @Override
+  public SQLXML createSQLXML() throws SQLException {
+    return conn.createSQLXML();
+  }
+
+  @Override
+  public boolean isValid(int timeout) throws SQLException {
+    return conn.isValid(timeout);
+  }
+
+  @Override
+  public void setClientInfo(String name, String value) throws 
SQLClientInfoException {
+    conn.setClientInfo(name, value);
+  }
+
+  @Override
+  public void setClientInfo(Properties properties) throws 
SQLClientInfoException {
+    conn.setClientInfo(properties);
+  }
+
+  @Override
+  public String getClientInfo(String name) throws SQLException {
+    return conn.getClientInfo(name);
+  }
+
+  @Override
+  public Properties getClientInfo() throws SQLException {
+    return conn.getClientInfo();
+  }
+
+  @Override
+  public Array createArrayOf(String typeName, Object[] elements) throws 
SQLException {
+    return conn.createArrayOf(typeName, elements);
+  }
+
+  @Override
+  public Struct createStruct(String typeName, Object[] attributes) throws 
SQLException {
+    return conn.createStruct(typeName, attributes);
+  }
+
+    private D conn;
+
+    public DelegatingConnection(D conn) {
+    this.conn = conn;
+  }
+
+    @Override
+    public void setSchema(String schema) throws SQLException {
+        conn.setSchema(schema);
+    }
+
+    @Override
+    public String getSchema() throws SQLException {
+        return conn.getSchema();
+    }
+
+    @Override
+    public void abort(Executor executor) throws SQLException {
+        conn.abort(executor);
+    }
+
+    @Override
+    public void setNetworkTimeout(Executor executor, int milliseconds) throws 
SQLException {
+        conn.setNetworkTimeout(executor, milliseconds);
+    }
+
+    @Override
+    public int getNetworkTimeout() throws SQLException {
+        return conn.getNetworkTimeout();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java
new file mode 100644
index 0000000..a054bf2
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.trace;
+
+import java.sql.SQLException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.phoenix.metrics.MetricsWriter;
+import org.apache.phoenix.metrics.PhoenixMetricsRecord;
+
+/**
+ *
+ */
+public class DisableableMetricsWriter implements MetricsWriter {
+
+    private static final Log LOG = 
LogFactory.getLog(DisableableMetricsWriter.class);
+    private PhoenixTableMetricsWriter writer;
+    private AtomicBoolean disabled = new AtomicBoolean(false);
+
+    public DisableableMetricsWriter(PhoenixTableMetricsWriter writer) {
+        this.writer = writer;
+    }
+
+    @Override
+    public void initialize() {
+        if (this.disabled.get()) return;
+        writer.initialize();
+    }
+
+    @Override
+    public void flush() {
+        if (this.disabled.get()) {
+            clear();
+            return;
+        }
+        writer.flush();
+
+    }
+
+    @Override
+    public void addMetrics(PhoenixMetricsRecord record) {
+        if (this.disabled.get()) return;
+        writer.addMetrics(record);
+    }
+
+    public void disable() {
+        this.disabled.set(true);
+    }
+
+    public void enable() {
+        this.disabled.set(false);
+    }
+
+    public void clear() {
+        // clear any pending writes
+        try {
+            writer.clearForTesting();
+        } catch (SQLException e) {
+            LOG.error("Couldn't clear the delgate writer when flush called and 
disabled", e);
+        }
+    }
+
+    public PhoenixTableMetricsWriter getDelegate() {
+        return this.writer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/Hadoop1TracingTestEnabler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/trace/Hadoop1TracingTestEnabler.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/trace/Hadoop1TracingTestEnabler.java
new file mode 100644
index 0000000..07e2305
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/trace/Hadoop1TracingTestEnabler.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.trace;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.runner.notification.RunNotifier;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runners.model.FrameworkMethod;
+import org.junit.runners.model.InitializationError;
+
+/**
+ * Test runner to run classes that depend on Hadoop1 compatibility that may 
not be present for the
+ * feature
+ */
+public class Hadoop1TracingTestEnabler extends BlockJUnit4ClassRunner {
+
+    public Hadoop1TracingTestEnabler(Class<?> klass) throws 
InitializationError {
+        super(klass);
+    }
+
+    @Override
+    public void runChild(FrameworkMethod method, RunNotifier notifier) {
+        // if the class is already disabled, then we can disable on the class 
level, otherwise we
+        // just check the per-method
+        Hadoop1Disabled condition =
+                
getTestClass().getJavaClass().getAnnotation(Hadoop1Disabled.class);
+        if (condition == null) {
+            condition = method
+                        .getAnnotation(Hadoop1Disabled.class);
+        }
+
+        // if this has the flag, then we want to disable it if hadoop1 is not 
enabled for that
+        // feature
+        if (condition != null && getEnabled(condition.value())) {
+            super.runChild(method, notifier);
+        } else {
+            notifier.fireTestIgnored(describeChild(method));
+        }
+    }
+
+    /**
+     * Simple check that just uses if-else logic. We can move to something 
more complex, policy
+     * based later when this gets more complex.
+     * @param feature name of the feature to check
+     * @return <tt>true</tt> if the test method is enabled for the given 
feature, <tt>false</tt>
+     *         otherwise
+     */
+    private boolean getEnabled(String feature) {
+        if (feature.equals("tracing")) {
+            return !BaseTracingTestIT.shouldEarlyExitForHadoop1Test();
+        }
+        return true;
+    }
+
+    /**
+     * Marker that a class/method should be disabled if hadoop1 features are 
not enabled. It takes a
+     * value for the Hadoop1 feature on which this class/method depends, for 
instance "tracing" is
+     * not supported in Hadoop1 (yet).
+     */
+    @Target({ ElementType.TYPE, ElementType.METHOD })
+    @Retention(RetentionPolicy.RUNTIME)
+    public static @interface Hadoop1Disabled {
+        String value();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricImpl.java 
b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricImpl.java
new file mode 100644
index 0000000..985504f
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricImpl.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.trace;
+
+import org.apache.phoenix.metrics.PhoenixAbstractMetric;
+
+/**
+ * Simple metric implementation for testing
+ */
+public class PhoenixMetricImpl implements PhoenixAbstractMetric {
+
+    private String name;
+    private Number value;
+
+    public PhoenixMetricImpl(String name, Number value) {
+        this.name = name;
+        this.value = value;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public Number value() {
+        return value;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricRecordImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricRecordImpl.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricRecordImpl.java
new file mode 100644
index 0000000..45cabf0
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricRecordImpl.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.trace;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.phoenix.metrics.PhoenixAbstractMetric;
+import org.apache.phoenix.metrics.PhoenixMetricTag;
+import org.apache.phoenix.metrics.PhoenixMetricsRecord;
+
+import com.google.common.collect.Lists;
+
+/**
+ *
+ */
+public class PhoenixMetricRecordImpl implements PhoenixMetricsRecord {
+
+    private String name;
+    private String description;
+    private final List<PhoenixAbstractMetric> metrics = Lists.newArrayList();
+    private final List<PhoenixMetricTag> tags = Lists.newArrayList();
+
+    public PhoenixMetricRecordImpl(String name, String description) {
+        this.name = name;
+        this.description = description;
+    }
+
+    public void addMetric(PhoenixAbstractMetric metric) {
+        this.metrics.add(metric);
+    }
+
+    public void addTag(PhoenixMetricTag tag) {
+        this.tags.add(tag);
+    }
+
+    @Override
+    public String name() {
+        return this.name;
+    }
+
+    @Override
+    public String description() {
+        return this.description;
+    }
+
+    @Override
+    public Iterable<PhoenixAbstractMetric> metrics() {
+        return metrics;
+    }
+
+    @Override
+    public Collection<PhoenixMetricTag> tags() {
+        return tags;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java
new file mode 100644
index 0000000..b1544e8
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.trace;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.util.Collection;
+
+import org.apache.phoenix.metrics.PhoenixMetricsRecord;
+import org.apache.phoenix.trace.Hadoop1TracingTestEnabler.Hadoop1Disabled;
+import org.apache.phoenix.trace.TraceReader.SpanInfo;
+import org.apache.phoenix.trace.TraceReader.TraceHolder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+/**
+ * Test that the logging sink stores the expected metrics/stats
+ */
+@RunWith(Hadoop1TracingTestEnabler.class)
+@Hadoop1Disabled("tracing")
+public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT {
+
+    /**
+     * IT should create the target table if it hasn't been created yet, but 
not fail if the table
+     * has already been created
+     * @throws Exception on failure
+     */
+    @Test
+    public void testCreatesTable() throws Exception {
+        PhoenixTableMetricsWriter sink = new PhoenixTableMetricsWriter();
+        Connection conn = getConnectionWithoutTracing();
+        sink.initForTesting(conn);
+
+        // check for existence of the tracing table
+        try {
+            String ddl = "CREATE TABLE " + 
TracingCompat.DEFAULT_STATS_TABLE_NAME;
+            conn.createStatement().execute(ddl);
+            fail("Table " + TracingCompat.DEFAULT_STATS_TABLE_NAME
+                    + " was not created by the metrics sink");
+        } catch (Exception e) {
+            // expected
+        }
+
+        // initialize sink again, which should attempt to create the table, 
but not fail
+        try {
+            sink.initForTesting(conn);
+        } catch (Exception e) {
+            fail("Initialization shouldn't fail if table already exists!");
+        }
+    }
+
+    /**
+     * Simple metrics writing and reading check, that uses the standard 
wrapping in the
+     * {@link PhoenixMetricsWriter}
+     * @throws Exception on failure
+     */
+    @Test
+    public void writeMetrics() throws Exception {
+        // hook up a phoenix sink
+        PhoenixTableMetricsWriter sink = new PhoenixTableMetricsWriter();
+        Connection conn = getConnectionWithoutTracing();
+        sink.initForTesting(conn);
+
+        // create a simple metrics record
+        long traceid = 987654;
+        String description = "Some generic trace";
+        long spanid = 10;
+        long parentid = 11;
+        long startTime = 12;
+        long endTime = 13;
+        String annotation = "test annotation for a span";
+        String hostnameValue = "host-name.value";
+        PhoenixMetricsRecord record =
+                createRecord(traceid, parentid, spanid, description, 
startTime, endTime,
+                    hostnameValue, annotation);
+
+        // actually write the record to the table
+        sink.addMetrics(record);
+        sink.flush();
+
+        // make sure we only get expected stat entry (matcing the trace id), 
otherwise we could the
+        // stats for the update as well
+        TraceReader reader = new TraceReader(conn);
+        Collection<TraceHolder> traces = reader.readAll(10);
+        assertEquals("Wrong number of traces in the tracing table", 1, 
traces.size());
+
+        // validate trace
+        TraceHolder trace = traces.iterator().next();
+        // we are just going to get an orphan span b/c we don't send in a 
parent
+        assertEquals("Didn't get expected orphaned spans!" + trace.orphans, 1, 
trace.orphans.size());
+
+        assertEquals(traceid, trace.traceid);
+        SpanInfo spanInfo = trace.orphans.get(0);
+        assertEquals(description, spanInfo.description);
+        assertEquals(parentid, spanInfo.getParentIdForTesting());
+        assertEquals(startTime, spanInfo.start);
+        assertEquals(endTime, spanInfo.end);
+        assertEquals(hostnameValue, spanInfo.hostname);
+        assertEquals("Wrong number of tags", 0, spanInfo.tagCount);
+        assertEquals("Wrong number of annotations", 1, 
spanInfo.annotationCount);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTagImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTagImpl.java 
b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTagImpl.java
new file mode 100644
index 0000000..c8e2219
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTagImpl.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.trace;
+
+import org.apache.phoenix.metrics.PhoenixMetricTag;
+
+/**
+ * Simple Tag implementation for testing
+ */
+public class PhoenixTagImpl implements PhoenixMetricTag {
+
+    private final String name;
+    private final String description;
+    private final String value;
+
+    public PhoenixTagImpl(String name, String description, String value) {
+        super();
+        this.name = name;
+        this.description = description;
+        this.value = value;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public String description() {
+        return description;
+    }
+
+    @Override
+    public String value() {
+        return value;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
new file mode 100644
index 0000000..955d640
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.trace;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.metrics.Metrics;
+import org.apache.phoenix.metrics.TracingTestCompat;
+import org.apache.phoenix.trace.Hadoop1TracingTestEnabler.Hadoop1Disabled;
+import org.apache.phoenix.trace.TraceReader.SpanInfo;
+import org.apache.phoenix.trace.TraceReader.TraceHolder;
+import org.cloudera.htrace.Sampler;
+import org.cloudera.htrace.Span;
+import org.cloudera.htrace.SpanReceiver;
+import org.cloudera.htrace.Trace;
+import org.cloudera.htrace.TraceScope;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+/**
+ * Test that the logging sink stores the expected metrics/stats
+ */
+@RunWith(Hadoop1TracingTestEnabler.class)
+@Hadoop1Disabled("tracing")
+public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
+
+    private static final Log LOG = 
LogFactory.getLog(PhoenixTracingEndToEndIT.class);
+    private static final int MAX_RETRIES = 10;
+    private final String table = "ENABLED_FOR_LOGGING";
+    private final String index = "ENABALED_FOR_LOGGING_INDEX";
+
+    private static DisableableMetricsWriter sink;
+
+    @BeforeClass
+    public static void setupMetrics() throws Exception {
+        if (shouldEarlyExitForHadoop1Test()) {
+            return;
+        }
+        PhoenixTableMetricsWriter pWriter = new PhoenixTableMetricsWriter();
+        Connection conn = getConnectionWithoutTracing();
+        pWriter.initForTesting(conn);
+        sink = new DisableableMetricsWriter(pWriter);
+
+        TracingTestCompat.registerSink(sink);
+    }
+
+    @After
+    public void cleanup() {
+        sink.disable();
+        sink.clear();
+        sink.enable();
+
+        // LISTENABLE.clearListeners();
+    }
+
+    private static void waitForCommit(CountDownLatch latch) throws 
SQLException {
+        Connection conn = new 
CountDownConnection(getConnectionWithoutTracing(), latch);
+        replaceWriterConnection(conn);
+    }
+
+    private static void replaceWriterConnection(Connection conn) throws 
SQLException {
+        // disable the writer
+        sink.disable();
+
+        // swap the connection for one that listens
+        sink.getDelegate().initForTesting(conn);
+
+        // enable the writer
+        sink.enable();
+    }
+
+    /**
+     * Simple test that we can correctly write spans to the phoenix table
+     * @throws Exception on failure
+     */
+    @Test
+    public void testWriteSpans() throws Exception {
+        // get a receiver for the spans
+        SpanReceiver receiver = TracingCompat.newTraceMetricSource();
+        // which also needs to a source for the metrics system
+        Metrics.getManager().registerSource("testWriteSpans-source", "source 
for testWriteSpans",
+            receiver);
+
+        // watch our sink so we know when commits happen
+        CountDownLatch latch = new CountDownLatch(1);
+        waitForCommit(latch);
+
+        // write some spans
+        TraceScope trace = Trace.startSpan("Start write test", Sampler.ALWAYS);
+        Span span = trace.getSpan();
+
+        // add a child with some annotations
+        Span child = span.child("child 1");
+        child.addTimelineAnnotation("timeline annotation");
+        TracingCompat.addAnnotation(child, "test annotation", 10);
+        child.stop();
+
+        // sleep a little bit to get some time difference
+        Thread.sleep(100);
+
+        trace.close();
+
+        // pass the trace on
+        receiver.receiveSpan(span);
+
+        // wait for the tracer to actually do the write
+        latch.await();
+
+        // look for the writes to make sure they were made
+        Connection conn = getConnectionWithoutTracing();
+        checkStoredTraces(conn, new TraceChecker() {
+            public boolean foundTrace(TraceHolder trace, SpanInfo info) {
+                if (info.description.equals("child 1")) {
+                    assertEquals("Not all annotations present", 1, 
info.annotationCount);
+                    assertEquals("Not all tags present", 1, info.tagCount);
+                    boolean found = false;
+                    for (String annotation : info.annotations) {
+                        if (annotation.startsWith("test annotation")) {
+                            found = true;
+                        }
+                    }
+                    assertTrue("Missing the annotations in span: " + info, 
found);
+                    found = false;
+                    for (String tag : info.tags) {
+                        if (tag.endsWith("timeline annotation")) {
+                            found = true;
+                        }
+                    }
+                    assertTrue("Missing the tags in span: " + info, found);
+                    return true;
+                }
+                return false;
+            }
+        });
+    }
+
+    /**
+     * Test that span will actually go into the this sink and be written on 
both side of the wire,
+     * through the indexing code.
+     * @throws Exception
+     */
+    @Test
+    public void testClientServerIndexingTracing() throws Exception {
+        // one call for client side, one call for server side
+        final CountDownLatch updated = new CountDownLatch(2);
+        waitForCommit(updated);
+
+        // separate connection so we don't create extra traces
+        Connection conn = getConnectionWithoutTracing();
+        createTestTable(conn, true);
+
+        // trace the requests we send
+        Connection traceable = getTracingConnection();
+        LOG.debug("Doing dummy the writes to the tracked table");
+        String insert = "UPSERT INTO " + table + " VALUES (?, ?)";
+        PreparedStatement stmt = traceable.prepareStatement(insert);
+        stmt.setString(1, "key1");
+        stmt.setLong(2, 1);
+        // this first trace just does a simple open/close of the span. Its not 
doing anything
+        // terribly interesting because we aren't auto-committing on the 
connection, so it just
+        // updates the mutation state and returns.
+        stmt.execute();
+        stmt.setString(1, "key2");
+        stmt.setLong(2, 2);
+        stmt.execute();
+        traceable.commit();
+
+        // wait for the latch to countdown, as the metrics system is time-based
+        LOG.debug("Waiting for latch to complete!");
+        updated.await(200, TimeUnit.SECONDS);// should be way more than GC 
pauses
+
+        // read the traces back out
+
+        /* Expected:
+         * 1. Single element trace - for first PreparedStatement#execute span
+         * 2. Two element trace for second PreparedStatement#execute span
+         *  a. execute call
+         *  b. metadata lookup*
+         * 3. Commit trace.
+         *  a. Committing to tables
+         *    i. Committing to single table
+         *    ii. hbase batch write*
+         *    i.I. span on server
+         *    i.II. building index updates
+         *    i.III. waiting for latch
+         * where '*' is a generically named thread (e.g phoenix-1-thread-X)
+         */
+        boolean indexingCompleted = checkStoredTraces(conn, new TraceChecker() 
{
+            public boolean foundTrace(TraceHolder trace, SpanInfo span) {
+                String traceInfo = trace.toString();
+                // skip logging traces that are just traces about tracing
+                if 
(traceInfo.contains(TracingCompat.DEFAULT_STATS_TABLE_NAME)) {
+                    return false;
+                }
+                if (traceInfo.contains("Completing index")) {
+                    return true;
+                }
+                return false;
+            }
+        });
+
+        assertTrue("Never found indexing updates", indexingCompleted);
+    }
+
+    private void createTestTable(Connection conn, boolean withIndex) throws 
SQLException {
+        // create a dummy table
+        String ddl =
+                "create table if not exists " + table + "(" + "k varchar not 
null, " + "c1 bigint"
+                        + " CONSTRAINT pk PRIMARY KEY (k))";
+        conn.createStatement().execute(ddl);
+
+        // early exit if we don't need to create an index
+        if (!withIndex) {
+            return;
+        }
+        // create an index on the table - we know indexing has some basic 
tracing
+        ddl = "CREATE INDEX IF NOT EXISTS " + index + " on " + table + " (c1)";
+        conn.createStatement().execute(ddl);
+        conn.commit();
+    }
+
+    @Test
+    public void testScanTracing() throws Exception {
+        // separate connections to minimize amount of traces that are generated
+        Connection traceable = getTracingConnection();
+        Connection conn = getConnectionWithoutTracing();
+
+        // one call for client side, one call for server side
+        CountDownLatch updated = new CountDownLatch(2);
+        waitForCommit(updated);
+
+        // create a dummy table
+        createTestTable(conn, false);
+
+        // update the table, but don't trace these, to simplify the traces we 
read
+        LOG.debug("Doing dummy the writes to the tracked table");
+        String insert = "UPSERT INTO " + table + " VALUES (?, ?)";
+        PreparedStatement stmt = conn.prepareStatement(insert);
+        stmt.setString(1, "key1");
+        stmt.setLong(2, 1);
+        stmt.execute();
+        conn.commit();
+        conn.rollback();
+
+        // setup for next set of updates
+        stmt.setString(1, "key2");
+        stmt.setLong(2, 2);
+        stmt.execute();
+        conn.commit();
+        conn.rollback();
+
+        // do a scan of the table
+        String read = "SELECT * FROM " + table;
+        ResultSet results = traceable.createStatement().executeQuery(read);
+        assertTrue("Didn't get first result", results.next());
+        assertTrue("Didn't get second result", results.next());
+        results.close();
+
+        assertTrue("Get expected updates to trace table", updated.await(200, 
TimeUnit.SECONDS));
+        // don't trace reads either
+        boolean tracingComplete = checkStoredTraces(conn, new TraceChecker(){
+
+            @Override
+            public boolean foundTrace(TraceHolder currentTrace) {
+                String traceInfo = currentTrace.toString();
+                return traceInfo.contains("Parallel scanner");
+            }
+        });
+        assertTrue("Didn't find the parallel scanner in the tracing", 
tracingComplete);
+    }
+
+    @Test
+    public void testScanTracingOnServer() throws Exception {
+        // separate connections to minimize amount of traces that are generated
+        Connection traceable = getTracingConnection();
+        Connection conn = getConnectionWithoutTracing();
+
+        // one call for client side, one call for server side
+        CountDownLatch updated = new CountDownLatch(2);
+        waitForCommit(updated);
+
+        // create a dummy table
+        createTestTable(conn, false);
+
+        // update the table, but don't trace these, to simplify the traces we 
read
+        LOG.debug("Doing dummy the writes to the tracked table");
+        String insert = "UPSERT INTO " + table + " VALUES (?, ?)";
+        PreparedStatement stmt = conn.prepareStatement(insert);
+        stmt.setString(1, "key1");
+        stmt.setLong(2, 1);
+        stmt.execute();
+        conn.commit();
+        conn.rollback();
+
+        // setup for next set of updates
+        stmt.setString(1, "key2");
+        stmt.setLong(2, 2);
+        stmt.execute();
+        conn.commit();
+        conn.rollback();
+
+        // do a scan of the table
+        String read = "SELECT COUNT(*) FROM " + table;
+        ResultSet results = traceable.createStatement().executeQuery(read);
+        assertTrue("Didn't get count result", results.next());
+        // make sure we got the expected count
+        assertEquals("Didn't get the expected number of row", 2, 
results.getInt(1));
+        results.close();
+
+        assertTrue("Get expected updates to trace table", updated.await(200, 
TimeUnit.SECONDS));
+        // don't trace reads either
+        boolean found = checkStoredTraces(conn, new TraceChecker() {
+            public boolean foundTrace(TraceHolder trace) {
+                String traceInfo = trace.toString();
+                return 
traceInfo.contains(BaseScannerRegionObserver.SCANNER_OPENED_TRACE_INFO);
+            }
+        });
+        assertTrue("Didn't find the parallel scanner in the tracing", found);
+    }
+
+    private boolean checkStoredTraces(Connection conn, TraceChecker checker) 
throws Exception {
+        TraceReader reader = new TraceReader(conn);
+        int retries = 0;
+        boolean found = false;
+        outer: while (retries < MAX_RETRIES) {
+            Collection<TraceHolder> traces = reader.readAll(100);
+            for (TraceHolder trace : traces) {
+                LOG.info("Got trace: " + trace);
+                found = checker.foundTrace(trace);
+                if (found) {
+                    break outer;
+                }
+                for (SpanInfo span : trace.spans) {
+                    found = checker.foundTrace(trace, span);
+                    if (found) {
+                        break outer;
+                    }
+                }
+            }
+            LOG.info("======  Waiting for tracing updates to be propagated 
========");
+            Thread.sleep(1000);
+            retries++;
+        }
+        return found;
+    }
+
+    private abstract class TraceChecker {
+        public boolean foundTrace(TraceHolder currentTrace) {
+            return false;
+        }
+
+        public boolean foundTrace(TraceHolder currentTrace, SpanInfo 
currentSpan) {
+            return false;
+        }
+    }
+
+    private static class CountDownConnection extends DelegatingConnection {
+        private CountDownLatch commit;
+
+        public CountDownConnection(Connection conn, CountDownLatch commit) {
+            super(conn);
+            this.commit = commit;
+        }
+
+        @Override
+        public void commit() throws SQLException {
+            commit.countDown();
+            super.commit();
+        }
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/TraceReaderTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/trace/TraceReaderTest.java 
b/phoenix-core/src/it/java/org/apache/phoenix/trace/TraceReaderTest.java
new file mode 100644
index 0000000..5ea677e
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/TraceReaderTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.trace;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.phoenix.metrics.MetricInfo;
+import org.apache.phoenix.metrics.PhoenixAbstractMetric;
+import org.apache.phoenix.metrics.PhoenixMetricTag;
+import org.apache.phoenix.metrics.PhoenixMetricsRecord;
+import org.apache.phoenix.trace.Hadoop1TracingTestEnabler.Hadoop1Disabled;
+import org.apache.phoenix.trace.TraceReader.SpanInfo;
+import org.apache.phoenix.trace.TraceReader.TraceHolder;
+import org.cloudera.htrace.Span;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+/**
+ * Test that the {@link TraceReader} will correctly read traces written by the
+ * {@link PhoenixTableMetricsWriter}
+ */
+@RunWith(Hadoop1TracingTestEnabler.class)
+@Hadoop1Disabled("tracing")
+public class TraceReaderTest extends BaseTracingTestIT {
+
+    private static final Log LOG = LogFactory.getLog(TraceReaderTest.class);
+
+    @Test
+    public void singleSpan() throws Exception {
+        PhoenixTableMetricsWriter sink = new PhoenixTableMetricsWriter();
+        Properties props = new Properties(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        sink.initForTesting(conn);
+
+        // create a simple metrics record
+        long traceid = 987654;
+        PhoenixMetricsRecord record =
+                createAndFlush(sink, traceid, Span.ROOT_SPAN_ID, 10, "root", 
12, 13,
+                    "host-name.value", "test annotation for a span");
+
+        // start a reader
+        validateTraces(Collections.singletonList(record), conn, traceid);
+    }
+
+    private PhoenixMetricsRecord createAndFlush(PhoenixTableMetricsWriter 
sink, long traceid,
+            long parentid, long spanid, String desc, long startTime, long 
endTime, String hostname,
+            String... tags) {
+        PhoenixMetricsRecord record =
+                createRecord(traceid, parentid, spanid, desc, startTime, 
endTime, hostname, tags);
+        sink.addMetrics(record);
+        sink.flush();
+        return record;
+    }
+
+    /**
+     * Test multiple spans, within the same trace. Some spans are independent 
of the parent span,
+     * some are child spans
+     * @throws Exception on failure
+     */
+    @Test
+    public void testMultipleSpans() throws Exception {
+        // hook up a phoenix sink
+        PhoenixTableMetricsWriter sink = new PhoenixTableMetricsWriter();
+        Connection conn = getConnectionWithoutTracing();
+        sink.initForTesting(conn);
+
+        // create a simple metrics record
+        long traceid = 12345;
+        List<PhoenixMetricsRecord> records = new 
ArrayList<PhoenixMetricsRecord>();
+        PhoenixMetricsRecord record =
+                createAndFlush(sink, traceid, Span.ROOT_SPAN_ID, 7777, "root", 
10, 30,
+                    "hostname.value", "root-span tag");
+        records.add(record);
+
+        // then create a child record
+        record =
+                createAndFlush(sink, traceid, 7777, 6666, "c1", 11, 15, 
"hostname.value",
+                    "first child");
+        records.add(record);
+
+        // create a different child
+        record =
+                createAndFlush(sink, traceid, 7777, 5555, "c2", 11, 18, 
"hostname.value",
+                    "second child");
+        records.add(record);
+
+        // create a child of the second child
+        record =
+                createAndFlush(sink, traceid, 5555, 4444, "c3", 12, 16, 
"hostname.value",
+                    "third child");
+        records.add(record);
+
+        // flush all the values to the table
+        sink.flush();
+
+        // start a reader
+        validateTraces(records, conn, traceid);
+    }
+
+    private void validateTraces(List<PhoenixMetricsRecord> records, Connection 
conn, long traceid)
+            throws Exception {
+        TraceReader reader = new TraceReader(conn);
+        Collection<TraceHolder> traces = reader.readAll(1);
+        assertEquals("Got an unexpected number of traces!", 1, traces.size());
+        // make sure the trace matches what we wrote
+        TraceHolder trace = traces.iterator().next();
+        assertEquals("Got an unexpected traceid", traceid, trace.traceid);
+        assertEquals("Got an unexpected number of spans", records.size(), 
trace.spans.size());
+
+        validateTrace(records, trace);
+    }
+
+    /**
+     * @param records
+     * @param trace
+     */
+    private void validateTrace(List<PhoenixMetricsRecord> records, TraceHolder 
trace) {
+        // drop each span into a sorted list so we get the expected ordering
+        Iterator<SpanInfo> spanIter = trace.spans.iterator();
+        for (PhoenixMetricsRecord record : records) {
+            SpanInfo spanInfo = spanIter.next();
+            LOG.info("Checking span:\n" + spanInfo);
+            Iterator<PhoenixAbstractMetric> metricIter = 
record.metrics().iterator();
+            assertEquals("Got an unexpected span id", 
metricIter.next().value(), spanInfo.id);
+            long parentId = (Long) metricIter.next().value();
+            if (parentId == Span.ROOT_SPAN_ID) {
+                assertNull("Got a parent, but it was a root span!", 
spanInfo.parent);
+            } else {
+                assertEquals("Got an unexpected parent span id", parentId, 
spanInfo.parent.id);
+            }
+            assertEquals("Got an unexpected start time", 
metricIter.next().value(), spanInfo.start);
+            assertEquals("Got an unexpected end time", 
metricIter.next().value(), spanInfo.end);
+
+            Iterator<PhoenixMetricTag> tags = record.tags().iterator();
+
+            int annotationCount = 0;
+            while (tags.hasNext()) {
+                // hostname is a tag, so we differentiate it
+                PhoenixMetricTag tag = tags.next();
+                if (tag.name().equals(MetricInfo.HOSTNAME.traceName)) {
+                    assertEquals("Didn't store correct hostname value", 
tag.value(),
+                        spanInfo.hostname);
+                } else {
+                    int count = annotationCount++;
+                    assertEquals("Didn't get expected annotation", count + " - 
" + tag.value(),
+                        spanInfo.annotations.get(count));
+                }
+            }
+            assertEquals("Didn't get expected number of annotations", 
annotationCount,
+                spanInfo.annotationCount);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/main/java/org/apache/phoenix/call/CallRunner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/call/CallRunner.java 
b/phoenix-core/src/main/java/org/apache/phoenix/call/CallRunner.java
new file mode 100644
index 0000000..7dc90f8
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/call/CallRunner.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.call;
+
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Helper class to run a Call with a set of {@link CallWrapper}
+ */
+public class CallRunner {
+
+    /**
+     * Helper {@link Callable} that also declares the type of exception it 
will throw, to help with
+     * type safety/generics for java
+     * @param <V> value type returned
+     * @param <E> type of check exception thrown
+     */
+    public static interface CallableThrowable<V, E extends Exception> extends 
Callable<V> {
+        @Override
+        public V call() throws E;
+    }
+
+    private static final Log LOG = LogFactory.getLog(CallRunner.class);
+
+    private CallRunner() {
+        // no ctor for util class
+    }
+
+    public static <V, E extends Exception, T extends CallableThrowable<V, E>> 
V run(T call,
+            CallWrapper... wrappers) throws E {
+        try {
+            for (CallWrapper wrap : wrappers) {
+                wrap.before();
+            }
+            return call.call();
+        } finally {
+            // have to go in reverse, to match the before logic
+            for (int i = wrappers.length - 1; i >= 0; i--) {
+                try {
+                    wrappers[i].after();
+                } catch (Exception e) {
+                    LOG.error("Failed to complete wrapper " + wrappers[i], e);
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/main/java/org/apache/phoenix/call/CallWrapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/call/CallWrapper.java 
b/phoenix-core/src/main/java/org/apache/phoenix/call/CallWrapper.java
new file mode 100644
index 0000000..b84dd5d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/call/CallWrapper.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.call;
+
+/**
+ *
+ */
+public interface CallWrapper {
+
+    public void before();
+
+    public void after();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 3ca0ce3..c04511b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -19,12 +19,16 @@ package org.apache.phoenix.coprocessor;
 
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ServerUtil;
+import org.cloudera.htrace.Span;
 
 
 abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
@@ -50,6 +54,17 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
     public static final String DATA_TABLE_COLUMNS_TO_JOIN = 
"_DataTableColumnsToJoin";
     public static final String VIEW_CONSTANTS = "_ViewConstants";
 
+    /** Exposed for testing */
+    public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on 
server";
+    protected Configuration rawConf;
+
+    @Override
+    public void start(CoprocessorEnvironment e) throws IOException {
+        super.start(e);
+        this.rawConf =
+                ((RegionCoprocessorEnvironment) 
e).getRegionServerServices().getConfiguration();
+    }
+
     /**
      * Used by logger to identify coprocessor
      */
@@ -66,10 +81,27 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
      * 
      */
     @Override
-    public final RegionScanner postScannerOpen(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final 
RegionScanner s) throws IOException {
+    public final RegionScanner postScannerOpen(
+            final ObserverContext<RegionCoprocessorEnvironment> c, final Scan 
scan,
+            final RegionScanner s) throws IOException {
+        // turn on tracing, if its enabled
+        final Span child = Tracing.childOnServer(scan, rawConf, 
SCANNER_OPENED_TRACE_INFO);
         try {
-            return doPostScannerOpen(c, scan, s);
+            final RegionScanner scanner = doPostScannerOpen(c, scan, s);
+            return new DelegateRegionScanner(scanner) {
+                @Override
+                public void close() throws IOException {
+                    if (child != null) {
+                        child.stop();
+                    }
+                    delegate.close();
+                }
+
+            };
         } catch (Throwable t) {
+            if (child != null) {
+                child.stop();
+            }
             
ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(),
 t);
             return null; // impossible
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
new file mode 100644
index 0000000..f88a931
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+
+public class DelegateRegionScanner implements RegionScanner {
+
+    protected final RegionScanner delegate;
+
+    public DelegateRegionScanner(RegionScanner scanner) {
+        this.delegate = scanner;
+    }
+
+    @Override
+    public HRegionInfo getRegionInfo() {
+        return delegate.getRegionInfo();
+    }
+
+    @Override
+    public boolean isFilterDone() throws IOException {
+        return delegate.isFilterDone();
+    }
+
+    @Override
+    public boolean reseek(byte[] row) throws IOException {
+        return delegate.reseek(row);
+    }
+
+    @Override
+    public long getMvccReadPoint() {
+        return delegate.getMvccReadPoint();
+    }
+
+    @Override
+    public void close() throws IOException {
+        delegate.close();
+    }
+
+    public long getMaxResultSize() {
+        return delegate.getMaxResultSize();
+    }
+
+    public boolean next(List<Cell> arg0, int arg1) throws IOException {
+        return delegate.next(arg0, arg1);
+    }
+
+    public boolean next(List<Cell> arg0) throws IOException {
+        return delegate.next(arg0);
+    }
+
+    public boolean nextRaw(List<Cell> arg0, int arg1) throws IOException {
+        return delegate.nextRaw(arg0, arg1);
+    }
+
+    public boolean nextRaw(List<Cell> arg0) throws IOException {
+        return delegate.nextRaw(arg0);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
index be96956..8a270e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
@@ -58,9 +58,12 @@ import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.trace.TracingIterator;
+import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ScanUtil;
+import org.cloudera.htrace.TraceScope;
 
 import com.google.common.collect.Lists;
 
@@ -203,7 +206,7 @@ public abstract class BasicQueryPlan implements QueryPlan {
             }
         }
         ResultIterator iterator = newIterator();
-        return dependencies.isEmpty() ? 
+        iterator = dependencies.isEmpty() ?
                 iterator : new DelegateResultIterator(iterator) {
             @Override
             public void close() throws SQLException {
@@ -214,6 +217,12 @@ public abstract class BasicQueryPlan implements QueryPlan {
                 }
             }
         };
+
+        // wrap the iterator so we start/end tracing as we expect
+        TraceScope scope =
+                Tracing.startNewSpan(context.getConnection(), "Creating basic 
query for "
+                        + getPlanSteps(iterator));
+        return (scope.getSpan() != null) ? new TracingIterator(scope, 
iterator) : iterator;
     }
 
     private void serializeIndexMaintainerIntoScan(Scan scan, PTable dataTable) 
{
@@ -349,8 +358,12 @@ public abstract class BasicQueryPlan implements QueryPlan {
         // Optimize here when getting explain plan, as queries don't get 
optimized until after compilation
         QueryPlan plan = 
context.getConnection().getQueryServices().getOptimizer().optimize(context.getStatement(),
 this);
         ResultIterator iterator = plan.iterator();
+        return new ExplainPlan(getPlanSteps(iterator));
+    }
+
+    private List<String> getPlanSteps(ResultIterator iterator){
         List<String> planSteps = Lists.newArrayListWithExpectedSize(5);
         iterator.explain(planSteps);
-        return new ExplainPlan(planSteps);
+        return planSteps;
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 410b8b4..6519b66 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -49,11 +49,14 @@ import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.ServerUtil;
+import org.cloudera.htrace.Span;
+import org.cloudera.htrace.TraceScope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -333,6 +336,10 @@ public class MutationState implements SQLCloseable {
         long[] serverTimeStamps = validate();
         Iterator<Map.Entry<TableRef, 
Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = 
this.mutations.entrySet().iterator();
         List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> 
committedList = Lists.newArrayListWithCapacity(this.mutations.size());
+
+        // add tracing for this operation
+        TraceScope trace = Tracing.startNewSpan(connection, "Committing 
mutations to tables");
+        Span span = trace.getSpan();
         while (iterator.hasNext()) {
             Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> 
entry = iterator.next();
             Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = 
entry.getValue();
@@ -348,6 +355,10 @@ public class MutationState implements SQLCloseable {
                 byte[] htableName = pair.getFirst();
                 List<Mutation> mutations = pair.getSecond();
                 
+                //create a span per target table
+                //TODO maybe we can be smarter about the table name to string 
here?
+                Span child = Tracing.child(span,"Writing mutation batch for 
table: "+Bytes.toString(htableName));
+
                 int retryCount = 0;
                 boolean shouldRetry = false;
                 do {
@@ -358,6 +369,7 @@ public class MutationState implements SQLCloseable {
                         if 
(IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, 
tempPtr.getLength())) {
                             IndexMetaDataCacheClient client = new 
IndexMetaDataCacheClient(connection, tableRef);
                             cache = client.addIndexMetadataCache(mutations, 
tempPtr);
+                            child.addTimelineAnnotation("Updated index 
metadata cache");
                             uuidValue = cache.getId();
                             // If we haven't retried yet, retry for this case 
only, as it's possible that
                             // a split will occur after we send the index 
metadata cache to all known
@@ -385,7 +397,9 @@ public class MutationState implements SQLCloseable {
                     try {
                         if (logger.isDebugEnabled()) logMutationSize(hTable, 
mutations);
                         long startTime = System.currentTimeMillis();
+                        child.addTimelineAnnotation("Attempt " + retryCount);
                         hTable.batch(mutations);
+                        child.stop();
                         shouldRetry = false;
                         if (logger.isDebugEnabled()) logger.debug("Total time 
for batch call of  " + mutations.size() + " mutations into " + 
table.getName().getString() + ": " + (System.currentTimeMillis() - startTime) + 
" ms");
                         committedList.add(entry);
@@ -396,8 +410,15 @@ public class MutationState implements SQLCloseable {
                                 // Swallow this exception once, as it's 
possible that we split after sending the index metadata
                                 // and one of the region servers doesn't have 
it. This will cause it to have it the next go around.
                                 // If it fails again, we don't retry.
-                                logger.warn("Swallowing exception and retrying 
after clearing meta cache on connection. " + inferredE);
+                                String msg = "Swallowing exception and 
retrying after clearing meta cache on connection. " + inferredE;
+                                logger.warn(msg);
                                 
connection.getQueryServices().clearTableRegionCache(htableName);
+
+                                // add a new child span as this one failed
+                                child.addTimelineAnnotation(msg);
+                                child.stop();
+                                child = Tracing.child(span,"Failed batch, 
attempting retry");
+
                                 continue;
                             }
                             e = inferredE;
@@ -432,6 +453,7 @@ public class MutationState implements SQLCloseable {
             numRows -= entry.getValue().size();
             iterator.remove(); // Remove batches as we process them
         }
+        trace.close();
         assert(numRows==0);
         assert(this.mutations.isEmpty());
     }

Reply via email to