PHOENIX-177: Collect usage and performance metrics Add basic Dapper-like tracing (using Cloudera's HTrace library) to phoenix requests. This is the basic infrastructure to support more holistic, non-profiler based analysis.
This patch includes, among other things, the infrastructure to use HTrace, async-tracing handling via the Hadoop metrics2 framework, and trace read/write to a phoenix table. Currently, do NOT support Hadoop1 (though does work against Hadoop1). Default builds to hadoop2, rather than hadoop1 (particularly as hadoop1 is now a second-class citizen). Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b7f46c10 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b7f46c10 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b7f46c10 Branch: refs/heads/master Commit: b7f46c1051de3e23630dccb82677a0a16985f27c Parents: 9185f76 Author: Jesse Yates <jya...@apache.org> Authored: Fri Jun 6 16:11:32 2014 -0700 Committer: Jesse Yates <jya...@apache.org> Committed: Mon Jul 28 06:37:49 2014 -0700 ---------------------------------------------------------------------- phoenix-core/pom.xml | 47 +-- .../apache/phoenix/trace/BaseTracingTestIT.java | 117 ++++++ .../phoenix/trace/DelegatingConnection.java | 328 +++++++++++++++ .../phoenix/trace/DisableableMetricsWriter.java | 83 ++++ .../trace/Hadoop1TracingTestEnabler.java | 86 ++++ .../apache/phoenix/trace/PhoenixMetricImpl.java | 44 ++ .../phoenix/trace/PhoenixMetricRecordImpl.java | 71 ++++ .../trace/PhoenixTableMetricsWriterIT.java | 119 ++++++ .../apache/phoenix/trace/PhoenixTagImpl.java | 52 +++ .../phoenix/trace/PhoenixTracingEndToEndIT.java | 401 +++++++++++++++++++ .../apache/phoenix/trace/TraceReaderTest.java | 181 +++++++++ .../org/apache/phoenix/call/CallRunner.java | 66 +++ .../org/apache/phoenix/call/CallWrapper.java | 29 ++ .../coprocessor/BaseScannerRegionObserver.java | 36 +- .../coprocessor/DelegateRegionScanner.java | 78 ++++ .../apache/phoenix/execute/BasicQueryPlan.java | 17 +- .../apache/phoenix/execute/MutationState.java | 24 +- .../org/apache/phoenix/hbase/index/Indexer.java | 39 ++ .../phoenix/iterate/ParallelIterators.java | 5 +- .../apache/phoenix/jdbc/PhoenixConnection.java | 38 +- .../apache/phoenix/jdbc/PhoenixStatement.java | 25 +- .../trace/PhoenixTableMetricsWriter.java | 255 ++++++++++++ .../org/apache/phoenix/trace/TraceReader.java | 375 +++++++++++++++++ .../apache/phoenix/trace/TracingIterator.java | 58 +++ .../trace/util/ConfigurationAdapter.java | 56 +++ .../org/apache/phoenix/trace/util/NullSpan.java | 112 ++++++ .../org/apache/phoenix/trace/util/Tracing.java | 282 +++++++++++++ .../phoenix/util/PhoenixContextExecutor.java | 23 ++ .../java/org/apache/phoenix/util/QueryUtil.java | 29 ++ .../test/resources/hadoop-metrics2.properties | 25 ++ .../src/test/resources/log4j.properties | 4 +- phoenix-hadoop-compat/pom.xml | 31 +- .../org/apache/phoenix/metrics/MetricInfo.java | 51 +++ .../org/apache/phoenix/metrics/Metrics.java | 39 ++ .../apache/phoenix/metrics/MetricsManager.java | 58 +++ .../apache/phoenix/metrics/MetricsWriter.java | 31 ++ .../phoenix/metrics/PhoenixAbstractMetric.java | 30 ++ .../phoenix/metrics/PhoenixMetricTag.java | 27 ++ .../phoenix/metrics/PhoenixMetricsRecord.java | 35 ++ .../phoenix/trace/PhoenixSpanReceiver.java | 26 ++ .../phoenix/trace/TestableMetricsWriter.java | 30 ++ .../org/apache/phoenix/trace/TracingCompat.java | 94 +++++ .../org/apache/phoenix/metrics/LoggingSink.java | 56 +++ .../phoenix/metrics/TracingTestCompat.java | 45 +++ phoenix-hadoop2-compat/pom.xml | 47 ++- .../phoenix/metrics/MetricsManagerImpl.java | 71 ++++ .../apache/phoenix/trace/MetricsInfoImpl.java | 63 +++ .../phoenix/trace/PhoenixMetricsWriter.java | 176 ++++++++ .../apache/phoenix/trace/TraceMetricSource.java | 192 +++++++++ .../org.apache.phoenix.metrics.MetricsManager | 1 + ...org.apache.phoenix.trace.PhoenixSpanReceiver | 1 + ...g.apache.phoenix.trace.TestableMetricsWriter | 1 + .../metrics2/impl/ExposedMetricCounterLong.java | 35 ++ .../metrics2/impl/ExposedMetricsRecordImpl.java | 43 ++ .../metrics2/lib/ExposedMetricsInfoImpl.java | 32 ++ .../phoenix/trace/PhoenixMetricsWriterTest.java | 142 +++++++ .../org/apache/phoenix/trace/TracingTest.java | 34 ++ pom.xml | 40 +- 58 files changed, 4479 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml index f5c3ace..cfdee95 100644 --- a/phoenix-core/pom.xml +++ b/phoenix-core/pom.xml @@ -299,25 +299,23 @@ <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> - <version>${collections.version}</version> - </dependency> + </dependency> </dependencies> <profiles> - <!-- Profile for building against Hadoop 1. Active by default. Not used if another - Hadoop profile is specified with mvn -Dhadoop.profile=foo --> + <!-- Profile for building against Hadoop 1. Activate using: mvn -Dhadoop.profile=1--> <profile> <id>hadoop-1</id> <activation> <property> - <name>!hadoop.profile</name> + <name>hadoop.profile</name> + <value>1</value> </property> </activation> <dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-testing-util</artifactId> - <version>${hbase-hadoop1.version}</version> <exclusions> <exclusion> <groupId>org.jruby</groupId> @@ -341,22 +339,18 @@ <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> - <version>${hbase-hadoop1.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-protocol</artifactId> - <version>${hbase-hadoop1.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> - <version>${hbase-hadoop1.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> - <version>${hadoop-one.version}</version> <exclusions> <exclusion> <groupId>hsqldb</groupId> @@ -383,27 +377,29 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-test</artifactId> - <version>${hadoop-one.version}</version> <optional>true</optional> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-hadoop1-compat</artifactId> + </dependency> </dependencies> </profile> - <!-- Profile for building against Hadoop 2. Activate using: mvn -Dhadoop.profile=2 --> + <!-- Profile for building against Hadoop 2. Active by default. Not used if another + Hadoop profile is specified with mvn -Dhadoop.profile=foo --> <profile> <id>hadoop-2</id> <activation> <property> - <name>hadoop.profile</name> - <value>2</value> + <name>!hadoop.profile</name> </property> </activation> <dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-testing-util</artifactId> - <version>${hbase-hadoop2.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> @@ -415,23 +411,14 @@ <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> - <version>${hbase-hadoop2.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-common</artifactId> - <version>${hbase-hadoop2.version}</version> - <type>test-jar</type> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-protocol</artifactId> - <version>${hbase-hadoop2.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> - <version>${hbase-hadoop2.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> @@ -447,13 +434,11 @@ <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop-compat</artifactId> - <version>${hbase-hadoop2.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop-compat</artifactId> - <version>${hbase-hadoop2.version}</version> <type>test-jar</type> <scope>test</scope> </dependency> @@ -486,6 +471,16 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minicluster</artifactId> </dependency> + <dependency> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-hadoop2-compat</artifactId> + </dependency> + <dependency> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-hadoop2-compat</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java new file mode 100644 index 0000000..59cd871 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.trace; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; +import org.apache.phoenix.metrics.MetricInfo; +import org.apache.phoenix.metrics.Metrics; +import org.apache.phoenix.metrics.PhoenixAbstractMetric; +import org.apache.phoenix.metrics.PhoenixMetricTag; +import org.apache.phoenix.metrics.PhoenixMetricsRecord; +import org.apache.phoenix.trace.util.Tracing; +import org.apache.phoenix.trace.util.Tracing.Frequency; + +/** + * Base test for tracing tests - helps manage getting tracing/non-tracing + * connections, as well as any supporting utils. + */ +public class BaseTracingTestIT extends BaseHBaseManagedTimeIT { + private static final Log LOG = LogFactory.getLog(BaseTracingTestIT.class); + + /** + * Hadoop1 doesn't yet support tracing (need metrics library support) so we just skip those + * tests for the moment + * @return <tt>true</tt> if the test should exit because some necessary classes are missing, or + * <tt>false</tt> if the tests can continue normally + */ + static boolean shouldEarlyExitForHadoop1Test() { + try { + // get a receiver for the spans + TracingCompat.newTraceMetricSource(); + // which also needs to a source for the metrics system + Metrics.getManager(); + return false; + } catch (RuntimeException e) { + LOG.error("Shouldn't run test because can't instantiate necessary metrics/tracing classes!"); + } + + return true; + } + + public static Connection getConnectionWithoutTracing() throws SQLException { + Properties props = new Properties(TEST_PROPERTIES); + return getConnectionWithoutTracing(props); + } + + public static Connection getConnectionWithoutTracing(Properties props) throws SQLException { + Connection conn = getConnectionWithTracingFrequency(props, Frequency.NEVER); + conn.setAutoCommit(false); + return conn; + } + + public static Connection getTracingConnection() throws Exception { + Properties props = new Properties(TEST_PROPERTIES); + return getConnectionWithTracingFrequency(props, Tracing.Frequency.ALWAYS); + } + + public static Connection getConnectionWithTracingFrequency(Properties props, + Tracing.Frequency frequency) throws SQLException { + Tracing.setSampling(props, frequency); + return DriverManager.getConnection(getUrl(), props); + } + + public static PhoenixMetricsRecord createRecord(long traceid, long parentid, long spanid, + String desc, long startTime, long endTime, String hostname, String... tags) { + PhoenixMetricRecordImpl record = + new PhoenixMetricRecordImpl(TracingCompat.getTraceMetricName(traceid), desc); + PhoenixAbstractMetric span = new PhoenixMetricImpl(MetricInfo.SPAN.traceName, spanid); + record.addMetric(span); + + PhoenixAbstractMetric parent = new PhoenixMetricImpl(MetricInfo.PARENT.traceName, parentid); + record.addMetric(parent); + + PhoenixAbstractMetric start = new PhoenixMetricImpl(MetricInfo.START.traceName, startTime); + record.addMetric(start); + + PhoenixAbstractMetric end = new PhoenixMetricImpl(MetricInfo.END.traceName, endTime); + record.addMetric(end); + + int tagCount = 0; + for (String annotation : tags) { + PhoenixMetricTag tag = + new PhoenixTagImpl(MetricInfo.ANNOTATION.traceName, + Integer.toString(tagCount++), annotation); + record.addTag(tag); + } + String hostnameValue = "host-name.value"; + PhoenixMetricTag hostnameTag = + new PhoenixTagImpl(MetricInfo.HOSTNAME.traceName, "", hostnameValue); + record.addTag(hostnameTag); + + return record; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegatingConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegatingConnection.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegatingConnection.java new file mode 100644 index 0000000..261522d --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/DelegatingConnection.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.trace; + +import java.sql.Array; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.NClob; +import java.sql.PreparedStatement; +import java.sql.SQLClientInfoException; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Statement; +import java.sql.Struct; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +import org.apache.phoenix.jdbc.Jdbc7Shim; + +/** + * Simple {@link Connection} that just delegates to an underlying {@link Connection}. + * @param <D> delegate type that is both a {@link Connection} and a {@link Jdbc7Shim#Connection} + */ +public class DelegatingConnection<D extends Connection & Jdbc7Shim.Connection> implements + Connection, Jdbc7Shim.Connection { + + @Override + public <T> T unwrap(Class<T> iface) throws SQLException { + return conn.unwrap(iface); + } + + @Override + public boolean isWrapperFor(Class<?> iface) throws SQLException { + return conn.isWrapperFor(iface); + } + + @Override + public Statement createStatement() throws SQLException { + return conn.createStatement(); + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + return conn.prepareStatement(sql); + } + + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + return conn.prepareCall(sql); + } + + @Override + public String nativeSQL(String sql) throws SQLException { + return conn.nativeSQL(sql); + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException { + conn.setAutoCommit(autoCommit); + } + + @Override + public boolean getAutoCommit() throws SQLException { + return conn.getAutoCommit(); + } + + @Override + public void commit() throws SQLException { + conn.commit(); + } + + @Override + public void rollback() throws SQLException { + conn.rollback(); + } + + @Override + public void close() throws SQLException { + conn.close(); + } + + @Override + public boolean isClosed() throws SQLException { + return conn.isClosed(); + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + return conn.getMetaData(); + } + + @Override + public void setReadOnly(boolean readOnly) throws SQLException { + conn.setReadOnly(readOnly); + } + + @Override + public boolean isReadOnly() throws SQLException { + return conn.isReadOnly(); + } + + @Override + public void setCatalog(String catalog) throws SQLException { + conn.setCatalog(catalog); + } + + @Override + public String getCatalog() throws SQLException { + return conn.getCatalog(); + } + + @Override + public void setTransactionIsolation(int level) throws SQLException { + conn.setTransactionIsolation(level); + } + + @Override + public int getTransactionIsolation() throws SQLException { + return conn.getTransactionIsolation(); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return conn.getWarnings(); + } + + @Override + public void clearWarnings() throws SQLException { + conn.clearWarnings(); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + return conn.createStatement(resultSetType, resultSetConcurrency); + } + + @Override + public PreparedStatement + prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return conn.prepareStatement(sql, resultSetType, resultSetConcurrency); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) + throws SQLException { + return conn.prepareCall(sql, resultSetType, resultSetConcurrency); + } + + @Override + public Map<String, Class<?>> getTypeMap() throws SQLException { + return conn.getTypeMap(); + } + + @Override + public void setTypeMap(Map<String, Class<?>> map) throws SQLException { + conn.setTypeMap(map); + } + + @Override + public void setHoldability(int holdability) throws SQLException { + conn.setHoldability(holdability); + } + + @Override + public int getHoldability() throws SQLException { + return conn.getHoldability(); + } + + @Override + public Savepoint setSavepoint() throws SQLException { + return conn.setSavepoint(); + } + + @Override + public Savepoint setSavepoint(String name) throws SQLException { + return conn.setSavepoint(name); + } + + @Override + public void rollback(Savepoint savepoint) throws SQLException { + conn.rollback(savepoint); + } + + @Override + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + conn.releaseSavepoint(savepoint); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency, + int resultSetHoldability) throws SQLException { + return conn.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, + int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return conn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, + int resultSetHoldability) throws SQLException { + return conn.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + return conn.prepareStatement(sql, autoGeneratedKeys); + } + + @Override + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + return conn.prepareStatement(sql, columnIndexes); + } + + @Override + public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + return conn.prepareStatement(sql, columnNames); + } + + @Override + public Clob createClob() throws SQLException { + return conn.createClob(); + } + + @Override + public Blob createBlob() throws SQLException { + return conn.createBlob(); + } + + @Override + public NClob createNClob() throws SQLException { + return conn.createNClob(); + } + + @Override + public SQLXML createSQLXML() throws SQLException { + return conn.createSQLXML(); + } + + @Override + public boolean isValid(int timeout) throws SQLException { + return conn.isValid(timeout); + } + + @Override + public void setClientInfo(String name, String value) throws SQLClientInfoException { + conn.setClientInfo(name, value); + } + + @Override + public void setClientInfo(Properties properties) throws SQLClientInfoException { + conn.setClientInfo(properties); + } + + @Override + public String getClientInfo(String name) throws SQLException { + return conn.getClientInfo(name); + } + + @Override + public Properties getClientInfo() throws SQLException { + return conn.getClientInfo(); + } + + @Override + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + return conn.createArrayOf(typeName, elements); + } + + @Override + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + return conn.createStruct(typeName, attributes); + } + + private D conn; + + public DelegatingConnection(D conn) { + this.conn = conn; + } + + @Override + public void setSchema(String schema) throws SQLException { + conn.setSchema(schema); + } + + @Override + public String getSchema() throws SQLException { + return conn.getSchema(); + } + + @Override + public void abort(Executor executor) throws SQLException { + conn.abort(executor); + } + + @Override + public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + conn.setNetworkTimeout(executor, milliseconds); + } + + @Override + public int getNetworkTimeout() throws SQLException { + return conn.getNetworkTimeout(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java new file mode 100644 index 0000000..a054bf2 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.trace; + +import java.sql.SQLException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.phoenix.metrics.MetricsWriter; +import org.apache.phoenix.metrics.PhoenixMetricsRecord; + +/** + * + */ +public class DisableableMetricsWriter implements MetricsWriter { + + private static final Log LOG = LogFactory.getLog(DisableableMetricsWriter.class); + private PhoenixTableMetricsWriter writer; + private AtomicBoolean disabled = new AtomicBoolean(false); + + public DisableableMetricsWriter(PhoenixTableMetricsWriter writer) { + this.writer = writer; + } + + @Override + public void initialize() { + if (this.disabled.get()) return; + writer.initialize(); + } + + @Override + public void flush() { + if (this.disabled.get()) { + clear(); + return; + } + writer.flush(); + + } + + @Override + public void addMetrics(PhoenixMetricsRecord record) { + if (this.disabled.get()) return; + writer.addMetrics(record); + } + + public void disable() { + this.disabled.set(true); + } + + public void enable() { + this.disabled.set(false); + } + + public void clear() { + // clear any pending writes + try { + writer.clearForTesting(); + } catch (SQLException e) { + LOG.error("Couldn't clear the delgate writer when flush called and disabled", e); + } + } + + public PhoenixTableMetricsWriter getDelegate() { + return this.writer; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/Hadoop1TracingTestEnabler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/Hadoop1TracingTestEnabler.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/Hadoop1TracingTestEnabler.java new file mode 100644 index 0000000..07e2305 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/Hadoop1TracingTestEnabler.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.trace; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.runner.notification.RunNotifier; +import org.junit.runners.BlockJUnit4ClassRunner; +import org.junit.runners.model.FrameworkMethod; +import org.junit.runners.model.InitializationError; + +/** + * Test runner to run classes that depend on Hadoop1 compatibility that may not be present for the + * feature + */ +public class Hadoop1TracingTestEnabler extends BlockJUnit4ClassRunner { + + public Hadoop1TracingTestEnabler(Class<?> klass) throws InitializationError { + super(klass); + } + + @Override + public void runChild(FrameworkMethod method, RunNotifier notifier) { + // if the class is already disabled, then we can disable on the class level, otherwise we + // just check the per-method + Hadoop1Disabled condition = + getTestClass().getJavaClass().getAnnotation(Hadoop1Disabled.class); + if (condition == null) { + condition = method + .getAnnotation(Hadoop1Disabled.class); + } + + // if this has the flag, then we want to disable it if hadoop1 is not enabled for that + // feature + if (condition != null && getEnabled(condition.value())) { + super.runChild(method, notifier); + } else { + notifier.fireTestIgnored(describeChild(method)); + } + } + + /** + * Simple check that just uses if-else logic. We can move to something more complex, policy + * based later when this gets more complex. + * @param feature name of the feature to check + * @return <tt>true</tt> if the test method is enabled for the given feature, <tt>false</tt> + * otherwise + */ + private boolean getEnabled(String feature) { + if (feature.equals("tracing")) { + return !BaseTracingTestIT.shouldEarlyExitForHadoop1Test(); + } + return true; + } + + /** + * Marker that a class/method should be disabled if hadoop1 features are not enabled. It takes a + * value for the Hadoop1 feature on which this class/method depends, for instance "tracing" is + * not supported in Hadoop1 (yet). + */ + @Target({ ElementType.TYPE, ElementType.METHOD }) + @Retention(RetentionPolicy.RUNTIME) + public static @interface Hadoop1Disabled { + String value(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricImpl.java new file mode 100644 index 0000000..985504f --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricImpl.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.trace; + +import org.apache.phoenix.metrics.PhoenixAbstractMetric; + +/** + * Simple metric implementation for testing + */ +public class PhoenixMetricImpl implements PhoenixAbstractMetric { + + private String name; + private Number value; + + public PhoenixMetricImpl(String name, Number value) { + this.name = name; + this.value = value; + } + + @Override + public String getName() { + return name; + } + + @Override + public Number value() { + return value; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricRecordImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricRecordImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricRecordImpl.java new file mode 100644 index 0000000..45cabf0 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixMetricRecordImpl.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.trace; + +import java.util.Collection; +import java.util.List; + +import org.apache.phoenix.metrics.PhoenixAbstractMetric; +import org.apache.phoenix.metrics.PhoenixMetricTag; +import org.apache.phoenix.metrics.PhoenixMetricsRecord; + +import com.google.common.collect.Lists; + +/** + * + */ +public class PhoenixMetricRecordImpl implements PhoenixMetricsRecord { + + private String name; + private String description; + private final List<PhoenixAbstractMetric> metrics = Lists.newArrayList(); + private final List<PhoenixMetricTag> tags = Lists.newArrayList(); + + public PhoenixMetricRecordImpl(String name, String description) { + this.name = name; + this.description = description; + } + + public void addMetric(PhoenixAbstractMetric metric) { + this.metrics.add(metric); + } + + public void addTag(PhoenixMetricTag tag) { + this.tags.add(tag); + } + + @Override + public String name() { + return this.name; + } + + @Override + public String description() { + return this.description; + } + + @Override + public Iterable<PhoenixAbstractMetric> metrics() { + return metrics; + } + + @Override + public Collection<PhoenixMetricTag> tags() { + return tags; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java new file mode 100644 index 0000000..b1544e8 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.trace; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.util.Collection; + +import org.apache.phoenix.metrics.PhoenixMetricsRecord; +import org.apache.phoenix.trace.Hadoop1TracingTestEnabler.Hadoop1Disabled; +import org.apache.phoenix.trace.TraceReader.SpanInfo; +import org.apache.phoenix.trace.TraceReader.TraceHolder; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** + * Test that the logging sink stores the expected metrics/stats + */ +@RunWith(Hadoop1TracingTestEnabler.class) +@Hadoop1Disabled("tracing") +public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT { + + /** + * IT should create the target table if it hasn't been created yet, but not fail if the table + * has already been created + * @throws Exception on failure + */ + @Test + public void testCreatesTable() throws Exception { + PhoenixTableMetricsWriter sink = new PhoenixTableMetricsWriter(); + Connection conn = getConnectionWithoutTracing(); + sink.initForTesting(conn); + + // check for existence of the tracing table + try { + String ddl = "CREATE TABLE " + TracingCompat.DEFAULT_STATS_TABLE_NAME; + conn.createStatement().execute(ddl); + fail("Table " + TracingCompat.DEFAULT_STATS_TABLE_NAME + + " was not created by the metrics sink"); + } catch (Exception e) { + // expected + } + + // initialize sink again, which should attempt to create the table, but not fail + try { + sink.initForTesting(conn); + } catch (Exception e) { + fail("Initialization shouldn't fail if table already exists!"); + } + } + + /** + * Simple metrics writing and reading check, that uses the standard wrapping in the + * {@link PhoenixMetricsWriter} + * @throws Exception on failure + */ + @Test + public void writeMetrics() throws Exception { + // hook up a phoenix sink + PhoenixTableMetricsWriter sink = new PhoenixTableMetricsWriter(); + Connection conn = getConnectionWithoutTracing(); + sink.initForTesting(conn); + + // create a simple metrics record + long traceid = 987654; + String description = "Some generic trace"; + long spanid = 10; + long parentid = 11; + long startTime = 12; + long endTime = 13; + String annotation = "test annotation for a span"; + String hostnameValue = "host-name.value"; + PhoenixMetricsRecord record = + createRecord(traceid, parentid, spanid, description, startTime, endTime, + hostnameValue, annotation); + + // actually write the record to the table + sink.addMetrics(record); + sink.flush(); + + // make sure we only get expected stat entry (matcing the trace id), otherwise we could the + // stats for the update as well + TraceReader reader = new TraceReader(conn); + Collection<TraceHolder> traces = reader.readAll(10); + assertEquals("Wrong number of traces in the tracing table", 1, traces.size()); + + // validate trace + TraceHolder trace = traces.iterator().next(); + // we are just going to get an orphan span b/c we don't send in a parent + assertEquals("Didn't get expected orphaned spans!" + trace.orphans, 1, trace.orphans.size()); + + assertEquals(traceid, trace.traceid); + SpanInfo spanInfo = trace.orphans.get(0); + assertEquals(description, spanInfo.description); + assertEquals(parentid, spanInfo.getParentIdForTesting()); + assertEquals(startTime, spanInfo.start); + assertEquals(endTime, spanInfo.end); + assertEquals(hostnameValue, spanInfo.hostname); + assertEquals("Wrong number of tags", 0, spanInfo.tagCount); + assertEquals("Wrong number of annotations", 1, spanInfo.annotationCount); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTagImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTagImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTagImpl.java new file mode 100644 index 0000000..c8e2219 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTagImpl.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.trace; + +import org.apache.phoenix.metrics.PhoenixMetricTag; + +/** + * Simple Tag implementation for testing + */ +public class PhoenixTagImpl implements PhoenixMetricTag { + + private final String name; + private final String description; + private final String value; + + public PhoenixTagImpl(String name, String description, String value) { + super(); + this.name = name; + this.description = description; + this.value = value; + } + + @Override + public String name() { + return name; + } + + @Override + public String description() { + return description; + } + + @Override + public String value() { + return value; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java new file mode 100644 index 0000000..955d640 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.trace; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.metrics.Metrics; +import org.apache.phoenix.metrics.TracingTestCompat; +import org.apache.phoenix.trace.Hadoop1TracingTestEnabler.Hadoop1Disabled; +import org.apache.phoenix.trace.TraceReader.SpanInfo; +import org.apache.phoenix.trace.TraceReader.TraceHolder; +import org.cloudera.htrace.Sampler; +import org.cloudera.htrace.Span; +import org.cloudera.htrace.SpanReceiver; +import org.cloudera.htrace.Trace; +import org.cloudera.htrace.TraceScope; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** + * Test that the logging sink stores the expected metrics/stats + */ +@RunWith(Hadoop1TracingTestEnabler.class) +@Hadoop1Disabled("tracing") +public class PhoenixTracingEndToEndIT extends BaseTracingTestIT { + + private static final Log LOG = LogFactory.getLog(PhoenixTracingEndToEndIT.class); + private static final int MAX_RETRIES = 10; + private final String table = "ENABLED_FOR_LOGGING"; + private final String index = "ENABALED_FOR_LOGGING_INDEX"; + + private static DisableableMetricsWriter sink; + + @BeforeClass + public static void setupMetrics() throws Exception { + if (shouldEarlyExitForHadoop1Test()) { + return; + } + PhoenixTableMetricsWriter pWriter = new PhoenixTableMetricsWriter(); + Connection conn = getConnectionWithoutTracing(); + pWriter.initForTesting(conn); + sink = new DisableableMetricsWriter(pWriter); + + TracingTestCompat.registerSink(sink); + } + + @After + public void cleanup() { + sink.disable(); + sink.clear(); + sink.enable(); + + // LISTENABLE.clearListeners(); + } + + private static void waitForCommit(CountDownLatch latch) throws SQLException { + Connection conn = new CountDownConnection(getConnectionWithoutTracing(), latch); + replaceWriterConnection(conn); + } + + private static void replaceWriterConnection(Connection conn) throws SQLException { + // disable the writer + sink.disable(); + + // swap the connection for one that listens + sink.getDelegate().initForTesting(conn); + + // enable the writer + sink.enable(); + } + + /** + * Simple test that we can correctly write spans to the phoenix table + * @throws Exception on failure + */ + @Test + public void testWriteSpans() throws Exception { + // get a receiver for the spans + SpanReceiver receiver = TracingCompat.newTraceMetricSource(); + // which also needs to a source for the metrics system + Metrics.getManager().registerSource("testWriteSpans-source", "source for testWriteSpans", + receiver); + + // watch our sink so we know when commits happen + CountDownLatch latch = new CountDownLatch(1); + waitForCommit(latch); + + // write some spans + TraceScope trace = Trace.startSpan("Start write test", Sampler.ALWAYS); + Span span = trace.getSpan(); + + // add a child with some annotations + Span child = span.child("child 1"); + child.addTimelineAnnotation("timeline annotation"); + TracingCompat.addAnnotation(child, "test annotation", 10); + child.stop(); + + // sleep a little bit to get some time difference + Thread.sleep(100); + + trace.close(); + + // pass the trace on + receiver.receiveSpan(span); + + // wait for the tracer to actually do the write + latch.await(); + + // look for the writes to make sure they were made + Connection conn = getConnectionWithoutTracing(); + checkStoredTraces(conn, new TraceChecker() { + public boolean foundTrace(TraceHolder trace, SpanInfo info) { + if (info.description.equals("child 1")) { + assertEquals("Not all annotations present", 1, info.annotationCount); + assertEquals("Not all tags present", 1, info.tagCount); + boolean found = false; + for (String annotation : info.annotations) { + if (annotation.startsWith("test annotation")) { + found = true; + } + } + assertTrue("Missing the annotations in span: " + info, found); + found = false; + for (String tag : info.tags) { + if (tag.endsWith("timeline annotation")) { + found = true; + } + } + assertTrue("Missing the tags in span: " + info, found); + return true; + } + return false; + } + }); + } + + /** + * Test that span will actually go into the this sink and be written on both side of the wire, + * through the indexing code. + * @throws Exception + */ + @Test + public void testClientServerIndexingTracing() throws Exception { + // one call for client side, one call for server side + final CountDownLatch updated = new CountDownLatch(2); + waitForCommit(updated); + + // separate connection so we don't create extra traces + Connection conn = getConnectionWithoutTracing(); + createTestTable(conn, true); + + // trace the requests we send + Connection traceable = getTracingConnection(); + LOG.debug("Doing dummy the writes to the tracked table"); + String insert = "UPSERT INTO " + table + " VALUES (?, ?)"; + PreparedStatement stmt = traceable.prepareStatement(insert); + stmt.setString(1, "key1"); + stmt.setLong(2, 1); + // this first trace just does a simple open/close of the span. Its not doing anything + // terribly interesting because we aren't auto-committing on the connection, so it just + // updates the mutation state and returns. + stmt.execute(); + stmt.setString(1, "key2"); + stmt.setLong(2, 2); + stmt.execute(); + traceable.commit(); + + // wait for the latch to countdown, as the metrics system is time-based + LOG.debug("Waiting for latch to complete!"); + updated.await(200, TimeUnit.SECONDS);// should be way more than GC pauses + + // read the traces back out + + /* Expected: + * 1. Single element trace - for first PreparedStatement#execute span + * 2. Two element trace for second PreparedStatement#execute span + * a. execute call + * b. metadata lookup* + * 3. Commit trace. + * a. Committing to tables + * i. Committing to single table + * ii. hbase batch write* + * i.I. span on server + * i.II. building index updates + * i.III. waiting for latch + * where '*' is a generically named thread (e.g phoenix-1-thread-X) + */ + boolean indexingCompleted = checkStoredTraces(conn, new TraceChecker() { + public boolean foundTrace(TraceHolder trace, SpanInfo span) { + String traceInfo = trace.toString(); + // skip logging traces that are just traces about tracing + if (traceInfo.contains(TracingCompat.DEFAULT_STATS_TABLE_NAME)) { + return false; + } + if (traceInfo.contains("Completing index")) { + return true; + } + return false; + } + }); + + assertTrue("Never found indexing updates", indexingCompleted); + } + + private void createTestTable(Connection conn, boolean withIndex) throws SQLException { + // create a dummy table + String ddl = + "create table if not exists " + table + "(" + "k varchar not null, " + "c1 bigint" + + " CONSTRAINT pk PRIMARY KEY (k))"; + conn.createStatement().execute(ddl); + + // early exit if we don't need to create an index + if (!withIndex) { + return; + } + // create an index on the table - we know indexing has some basic tracing + ddl = "CREATE INDEX IF NOT EXISTS " + index + " on " + table + " (c1)"; + conn.createStatement().execute(ddl); + conn.commit(); + } + + @Test + public void testScanTracing() throws Exception { + // separate connections to minimize amount of traces that are generated + Connection traceable = getTracingConnection(); + Connection conn = getConnectionWithoutTracing(); + + // one call for client side, one call for server side + CountDownLatch updated = new CountDownLatch(2); + waitForCommit(updated); + + // create a dummy table + createTestTable(conn, false); + + // update the table, but don't trace these, to simplify the traces we read + LOG.debug("Doing dummy the writes to the tracked table"); + String insert = "UPSERT INTO " + table + " VALUES (?, ?)"; + PreparedStatement stmt = conn.prepareStatement(insert); + stmt.setString(1, "key1"); + stmt.setLong(2, 1); + stmt.execute(); + conn.commit(); + conn.rollback(); + + // setup for next set of updates + stmt.setString(1, "key2"); + stmt.setLong(2, 2); + stmt.execute(); + conn.commit(); + conn.rollback(); + + // do a scan of the table + String read = "SELECT * FROM " + table; + ResultSet results = traceable.createStatement().executeQuery(read); + assertTrue("Didn't get first result", results.next()); + assertTrue("Didn't get second result", results.next()); + results.close(); + + assertTrue("Get expected updates to trace table", updated.await(200, TimeUnit.SECONDS)); + // don't trace reads either + boolean tracingComplete = checkStoredTraces(conn, new TraceChecker(){ + + @Override + public boolean foundTrace(TraceHolder currentTrace) { + String traceInfo = currentTrace.toString(); + return traceInfo.contains("Parallel scanner"); + } + }); + assertTrue("Didn't find the parallel scanner in the tracing", tracingComplete); + } + + @Test + public void testScanTracingOnServer() throws Exception { + // separate connections to minimize amount of traces that are generated + Connection traceable = getTracingConnection(); + Connection conn = getConnectionWithoutTracing(); + + // one call for client side, one call for server side + CountDownLatch updated = new CountDownLatch(2); + waitForCommit(updated); + + // create a dummy table + createTestTable(conn, false); + + // update the table, but don't trace these, to simplify the traces we read + LOG.debug("Doing dummy the writes to the tracked table"); + String insert = "UPSERT INTO " + table + " VALUES (?, ?)"; + PreparedStatement stmt = conn.prepareStatement(insert); + stmt.setString(1, "key1"); + stmt.setLong(2, 1); + stmt.execute(); + conn.commit(); + conn.rollback(); + + // setup for next set of updates + stmt.setString(1, "key2"); + stmt.setLong(2, 2); + stmt.execute(); + conn.commit(); + conn.rollback(); + + // do a scan of the table + String read = "SELECT COUNT(*) FROM " + table; + ResultSet results = traceable.createStatement().executeQuery(read); + assertTrue("Didn't get count result", results.next()); + // make sure we got the expected count + assertEquals("Didn't get the expected number of row", 2, results.getInt(1)); + results.close(); + + assertTrue("Get expected updates to trace table", updated.await(200, TimeUnit.SECONDS)); + // don't trace reads either + boolean found = checkStoredTraces(conn, new TraceChecker() { + public boolean foundTrace(TraceHolder trace) { + String traceInfo = trace.toString(); + return traceInfo.contains(BaseScannerRegionObserver.SCANNER_OPENED_TRACE_INFO); + } + }); + assertTrue("Didn't find the parallel scanner in the tracing", found); + } + + private boolean checkStoredTraces(Connection conn, TraceChecker checker) throws Exception { + TraceReader reader = new TraceReader(conn); + int retries = 0; + boolean found = false; + outer: while (retries < MAX_RETRIES) { + Collection<TraceHolder> traces = reader.readAll(100); + for (TraceHolder trace : traces) { + LOG.info("Got trace: " + trace); + found = checker.foundTrace(trace); + if (found) { + break outer; + } + for (SpanInfo span : trace.spans) { + found = checker.foundTrace(trace, span); + if (found) { + break outer; + } + } + } + LOG.info("====== Waiting for tracing updates to be propagated ========"); + Thread.sleep(1000); + retries++; + } + return found; + } + + private abstract class TraceChecker { + public boolean foundTrace(TraceHolder currentTrace) { + return false; + } + + public boolean foundTrace(TraceHolder currentTrace, SpanInfo currentSpan) { + return false; + } + } + + private static class CountDownConnection extends DelegatingConnection { + private CountDownLatch commit; + + public CountDownConnection(Connection conn, CountDownLatch commit) { + super(conn); + this.commit = commit; + } + + @Override + public void commit() throws SQLException { + commit.countDown(); + super.commit(); + } + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/it/java/org/apache/phoenix/trace/TraceReaderTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/TraceReaderTest.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/TraceReaderTest.java new file mode 100644 index 0000000..5ea677e --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/TraceReaderTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.trace; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.phoenix.metrics.MetricInfo; +import org.apache.phoenix.metrics.PhoenixAbstractMetric; +import org.apache.phoenix.metrics.PhoenixMetricTag; +import org.apache.phoenix.metrics.PhoenixMetricsRecord; +import org.apache.phoenix.trace.Hadoop1TracingTestEnabler.Hadoop1Disabled; +import org.apache.phoenix.trace.TraceReader.SpanInfo; +import org.apache.phoenix.trace.TraceReader.TraceHolder; +import org.cloudera.htrace.Span; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** + * Test that the {@link TraceReader} will correctly read traces written by the + * {@link PhoenixTableMetricsWriter} + */ +@RunWith(Hadoop1TracingTestEnabler.class) +@Hadoop1Disabled("tracing") +public class TraceReaderTest extends BaseTracingTestIT { + + private static final Log LOG = LogFactory.getLog(TraceReaderTest.class); + + @Test + public void singleSpan() throws Exception { + PhoenixTableMetricsWriter sink = new PhoenixTableMetricsWriter(); + Properties props = new Properties(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + sink.initForTesting(conn); + + // create a simple metrics record + long traceid = 987654; + PhoenixMetricsRecord record = + createAndFlush(sink, traceid, Span.ROOT_SPAN_ID, 10, "root", 12, 13, + "host-name.value", "test annotation for a span"); + + // start a reader + validateTraces(Collections.singletonList(record), conn, traceid); + } + + private PhoenixMetricsRecord createAndFlush(PhoenixTableMetricsWriter sink, long traceid, + long parentid, long spanid, String desc, long startTime, long endTime, String hostname, + String... tags) { + PhoenixMetricsRecord record = + createRecord(traceid, parentid, spanid, desc, startTime, endTime, hostname, tags); + sink.addMetrics(record); + sink.flush(); + return record; + } + + /** + * Test multiple spans, within the same trace. Some spans are independent of the parent span, + * some are child spans + * @throws Exception on failure + */ + @Test + public void testMultipleSpans() throws Exception { + // hook up a phoenix sink + PhoenixTableMetricsWriter sink = new PhoenixTableMetricsWriter(); + Connection conn = getConnectionWithoutTracing(); + sink.initForTesting(conn); + + // create a simple metrics record + long traceid = 12345; + List<PhoenixMetricsRecord> records = new ArrayList<PhoenixMetricsRecord>(); + PhoenixMetricsRecord record = + createAndFlush(sink, traceid, Span.ROOT_SPAN_ID, 7777, "root", 10, 30, + "hostname.value", "root-span tag"); + records.add(record); + + // then create a child record + record = + createAndFlush(sink, traceid, 7777, 6666, "c1", 11, 15, "hostname.value", + "first child"); + records.add(record); + + // create a different child + record = + createAndFlush(sink, traceid, 7777, 5555, "c2", 11, 18, "hostname.value", + "second child"); + records.add(record); + + // create a child of the second child + record = + createAndFlush(sink, traceid, 5555, 4444, "c3", 12, 16, "hostname.value", + "third child"); + records.add(record); + + // flush all the values to the table + sink.flush(); + + // start a reader + validateTraces(records, conn, traceid); + } + + private void validateTraces(List<PhoenixMetricsRecord> records, Connection conn, long traceid) + throws Exception { + TraceReader reader = new TraceReader(conn); + Collection<TraceHolder> traces = reader.readAll(1); + assertEquals("Got an unexpected number of traces!", 1, traces.size()); + // make sure the trace matches what we wrote + TraceHolder trace = traces.iterator().next(); + assertEquals("Got an unexpected traceid", traceid, trace.traceid); + assertEquals("Got an unexpected number of spans", records.size(), trace.spans.size()); + + validateTrace(records, trace); + } + + /** + * @param records + * @param trace + */ + private void validateTrace(List<PhoenixMetricsRecord> records, TraceHolder trace) { + // drop each span into a sorted list so we get the expected ordering + Iterator<SpanInfo> spanIter = trace.spans.iterator(); + for (PhoenixMetricsRecord record : records) { + SpanInfo spanInfo = spanIter.next(); + LOG.info("Checking span:\n" + spanInfo); + Iterator<PhoenixAbstractMetric> metricIter = record.metrics().iterator(); + assertEquals("Got an unexpected span id", metricIter.next().value(), spanInfo.id); + long parentId = (Long) metricIter.next().value(); + if (parentId == Span.ROOT_SPAN_ID) { + assertNull("Got a parent, but it was a root span!", spanInfo.parent); + } else { + assertEquals("Got an unexpected parent span id", parentId, spanInfo.parent.id); + } + assertEquals("Got an unexpected start time", metricIter.next().value(), spanInfo.start); + assertEquals("Got an unexpected end time", metricIter.next().value(), spanInfo.end); + + Iterator<PhoenixMetricTag> tags = record.tags().iterator(); + + int annotationCount = 0; + while (tags.hasNext()) { + // hostname is a tag, so we differentiate it + PhoenixMetricTag tag = tags.next(); + if (tag.name().equals(MetricInfo.HOSTNAME.traceName)) { + assertEquals("Didn't store correct hostname value", tag.value(), + spanInfo.hostname); + } else { + int count = annotationCount++; + assertEquals("Didn't get expected annotation", count + " - " + tag.value(), + spanInfo.annotations.get(count)); + } + } + assertEquals("Didn't get expected number of annotations", annotationCount, + spanInfo.annotationCount); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/main/java/org/apache/phoenix/call/CallRunner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/call/CallRunner.java b/phoenix-core/src/main/java/org/apache/phoenix/call/CallRunner.java new file mode 100644 index 0000000..7dc90f8 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/call/CallRunner.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.call; + +import java.util.concurrent.Callable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Helper class to run a Call with a set of {@link CallWrapper} + */ +public class CallRunner { + + /** + * Helper {@link Callable} that also declares the type of exception it will throw, to help with + * type safety/generics for java + * @param <V> value type returned + * @param <E> type of check exception thrown + */ + public static interface CallableThrowable<V, E extends Exception> extends Callable<V> { + @Override + public V call() throws E; + } + + private static final Log LOG = LogFactory.getLog(CallRunner.class); + + private CallRunner() { + // no ctor for util class + } + + public static <V, E extends Exception, T extends CallableThrowable<V, E>> V run(T call, + CallWrapper... wrappers) throws E { + try { + for (CallWrapper wrap : wrappers) { + wrap.before(); + } + return call.call(); + } finally { + // have to go in reverse, to match the before logic + for (int i = wrappers.length - 1; i >= 0; i--) { + try { + wrappers[i].after(); + } catch (Exception e) { + LOG.error("Failed to complete wrapper " + wrappers[i], e); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/main/java/org/apache/phoenix/call/CallWrapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/call/CallWrapper.java b/phoenix-core/src/main/java/org/apache/phoenix/call/CallWrapper.java new file mode 100644 index 0000000..b84dd5d --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/call/CallWrapper.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.call; + +/** + * + */ +public interface CallWrapper { + + public void before(); + + public void after(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 3ca0ce3..c04511b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -19,12 +19,16 @@ package org.apache.phoenix.coprocessor; import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.ServerUtil; +import org.cloudera.htrace.Span; abstract public class BaseScannerRegionObserver extends BaseRegionObserver { @@ -50,6 +54,17 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String DATA_TABLE_COLUMNS_TO_JOIN = "_DataTableColumnsToJoin"; public static final String VIEW_CONSTANTS = "_ViewConstants"; + /** Exposed for testing */ + public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on server"; + protected Configuration rawConf; + + @Override + public void start(CoprocessorEnvironment e) throws IOException { + super.start(e); + this.rawConf = + ((RegionCoprocessorEnvironment) e).getRegionServerServices().getConfiguration(); + } + /** * Used by logger to identify coprocessor */ @@ -66,10 +81,27 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { * */ @Override - public final RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException { + public final RegionScanner postScannerOpen( + final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, + final RegionScanner s) throws IOException { + // turn on tracing, if its enabled + final Span child = Tracing.childOnServer(scan, rawConf, SCANNER_OPENED_TRACE_INFO); try { - return doPostScannerOpen(c, scan, s); + final RegionScanner scanner = doPostScannerOpen(c, scan, s); + return new DelegateRegionScanner(scanner) { + @Override + public void close() throws IOException { + if (child != null) { + child.stop(); + } + delegate.close(); + } + + }; } catch (Throwable t) { + if (child != null) { + child.stop(); + } ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t); return null; // impossible } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java new file mode 100644 index 0000000..f88a931 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */package org.apache.phoenix.coprocessor; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.regionserver.RegionScanner; + +public class DelegateRegionScanner implements RegionScanner { + + protected final RegionScanner delegate; + + public DelegateRegionScanner(RegionScanner scanner) { + this.delegate = scanner; + } + + @Override + public HRegionInfo getRegionInfo() { + return delegate.getRegionInfo(); + } + + @Override + public boolean isFilterDone() throws IOException { + return delegate.isFilterDone(); + } + + @Override + public boolean reseek(byte[] row) throws IOException { + return delegate.reseek(row); + } + + @Override + public long getMvccReadPoint() { + return delegate.getMvccReadPoint(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + public long getMaxResultSize() { + return delegate.getMaxResultSize(); + } + + public boolean next(List<Cell> arg0, int arg1) throws IOException { + return delegate.next(arg0, arg1); + } + + public boolean next(List<Cell> arg0) throws IOException { + return delegate.next(arg0); + } + + public boolean nextRaw(List<Cell> arg0, int arg1) throws IOException { + return delegate.nextRaw(arg0, arg1); + } + + public boolean nextRaw(List<Cell> arg0) throws IOException { + return delegate.nextRaw(arg0); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java index be96956..8a270e3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java @@ -58,9 +58,12 @@ import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.trace.TracingIterator; +import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ScanUtil; +import org.cloudera.htrace.TraceScope; import com.google.common.collect.Lists; @@ -203,7 +206,7 @@ public abstract class BasicQueryPlan implements QueryPlan { } } ResultIterator iterator = newIterator(); - return dependencies.isEmpty() ? + iterator = dependencies.isEmpty() ? iterator : new DelegateResultIterator(iterator) { @Override public void close() throws SQLException { @@ -214,6 +217,12 @@ public abstract class BasicQueryPlan implements QueryPlan { } } }; + + // wrap the iterator so we start/end tracing as we expect + TraceScope scope = + Tracing.startNewSpan(context.getConnection(), "Creating basic query for " + + getPlanSteps(iterator)); + return (scope.getSpan() != null) ? new TracingIterator(scope, iterator) : iterator; } private void serializeIndexMaintainerIntoScan(Scan scan, PTable dataTable) { @@ -349,8 +358,12 @@ public abstract class BasicQueryPlan implements QueryPlan { // Optimize here when getting explain plan, as queries don't get optimized until after compilation QueryPlan plan = context.getConnection().getQueryServices().getOptimizer().optimize(context.getStatement(), this); ResultIterator iterator = plan.iterator(); + return new ExplainPlan(getPlanSteps(iterator)); + } + + private List<String> getPlanSteps(ResultIterator iterator){ List<String> planSteps = Lists.newArrayListWithExpectedSize(5); iterator.explain(planSteps); - return new ExplainPlan(planSteps); + return planSteps; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7f46c10/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 410b8b4..6519b66 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -49,11 +49,14 @@ import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.ServerUtil; +import org.cloudera.htrace.Span; +import org.cloudera.htrace.TraceScope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -333,6 +336,10 @@ public class MutationState implements SQLCloseable { long[] serverTimeStamps = validate(); Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator(); List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> committedList = Lists.newArrayListWithCapacity(this.mutations.size()); + + // add tracing for this operation + TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables"); + Span span = trace.getSpan(); while (iterator.hasNext()) { Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry = iterator.next(); Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = entry.getValue(); @@ -348,6 +355,10 @@ public class MutationState implements SQLCloseable { byte[] htableName = pair.getFirst(); List<Mutation> mutations = pair.getSecond(); + //create a span per target table + //TODO maybe we can be smarter about the table name to string here? + Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName)); + int retryCount = 0; boolean shouldRetry = false; do { @@ -358,6 +369,7 @@ public class MutationState implements SQLCloseable { if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength())) { IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); cache = client.addIndexMetadataCache(mutations, tempPtr); + child.addTimelineAnnotation("Updated index metadata cache"); uuidValue = cache.getId(); // If we haven't retried yet, retry for this case only, as it's possible that // a split will occur after we send the index metadata cache to all known @@ -385,7 +397,9 @@ public class MutationState implements SQLCloseable { try { if (logger.isDebugEnabled()) logMutationSize(hTable, mutations); long startTime = System.currentTimeMillis(); + child.addTimelineAnnotation("Attempt " + retryCount); hTable.batch(mutations); + child.stop(); shouldRetry = false; if (logger.isDebugEnabled()) logger.debug("Total time for batch call of " + mutations.size() + " mutations into " + table.getName().getString() + ": " + (System.currentTimeMillis() - startTime) + " ms"); committedList.add(entry); @@ -396,8 +410,15 @@ public class MutationState implements SQLCloseable { // Swallow this exception once, as it's possible that we split after sending the index metadata // and one of the region servers doesn't have it. This will cause it to have it the next go around. // If it fails again, we don't retry. - logger.warn("Swallowing exception and retrying after clearing meta cache on connection. " + inferredE); + String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE; + logger.warn(msg); connection.getQueryServices().clearTableRegionCache(htableName); + + // add a new child span as this one failed + child.addTimelineAnnotation(msg); + child.stop(); + child = Tracing.child(span,"Failed batch, attempting retry"); + continue; } e = inferredE; @@ -432,6 +453,7 @@ public class MutationState implements SQLCloseable { numRows -= entry.getValue().size(); iterator.remove(); // Remove batches as we process them } + trace.close(); assert(numRows==0); assert(this.mutations.isEmpty()); }