Repository: phoenix Updated Branches: refs/heads/master dc9e305b1 -> 7c157ec93
PHOENIX-3752 Remove hadoop metrics integration from the tracing framework (Karan Mehta) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7c157ec9 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7c157ec9 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7c157ec9 Branch: refs/heads/master Commit: 7c157ec937b9cdd8d48b372709e2484fdde59338 Parents: dc9e305 Author: Samarth Jain <[email protected]> Authored: Fri May 26 10:36:59 2017 -0700 Committer: Samarth Jain <[email protected]> Committed: Fri May 26 10:36:59 2017 -0700 ---------------------------------------------------------------------- .../apache/phoenix/trace/BaseTracingTestIT.java | 66 ++++ .../phoenix/trace/DisableableMetricsWriter.java | 84 ----- .../trace/PhoenixTableMetricsWriterIT.java | 43 ++- .../phoenix/trace/PhoenixTraceReaderIT.java | 180 ----------- .../phoenix/trace/PhoenixTracingEndToEndIT.java | 223 ++++++++----- .../org/apache/phoenix/query/QueryServices.java | 4 + .../phoenix/query/QueryServicesOptions.java | 78 +++-- .../apache/phoenix/trace/TraceMetricSource.java | 183 ----------- .../org/apache/phoenix/trace/TraceReader.java | 16 +- .../apache/phoenix/trace/TraceSpanReceiver.java | 95 ++++++ .../org/apache/phoenix/trace/TraceWriter.java | 323 +++++++++++++++++++ .../org/apache/phoenix/trace/util/Tracing.java | 15 +- .../phoenix/trace/TraceMetricsSourceTest.java | 37 +-- 13 files changed, 745 insertions(+), 602 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c157ec9/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java index eed5618..e3a7510 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/BaseTracingTestIT.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CountDownLatch; import org.apache.hadoop.metrics2.AbstractMetric; import org.apache.hadoop.metrics2.MetricsInfo; @@ -36,6 +37,8 @@ import org.apache.hadoop.metrics2.MetricsTag; import org.apache.hadoop.metrics2.impl.ExposedMetricCounterLong; import org.apache.hadoop.metrics2.impl.ExposedMetricsRecordImpl; import org.apache.hadoop.metrics2.lib.ExposedMetricsInfoImpl; +import org.apache.htrace.Span; +import org.apache.htrace.impl.MilliSpan; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.metrics.MetricInfo; import org.apache.phoenix.trace.util.Tracing; @@ -50,6 +53,10 @@ import org.apache.phoenix.util.PropertiesUtil; public class BaseTracingTestIT extends ParallelStatsDisabledIT { + protected CountDownLatch latch; + protected int defaultTracingThreadPoolForTest = 1; + protected int defaultTracingBatchSizeForTest = 1; + public static Connection getConnectionWithoutTracing() throws SQLException { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); return getConnectionWithoutTracing(props); @@ -127,4 +134,63 @@ public class BaseTracingTestIT extends ParallelStatsDisabledIT { private static MetricsInfo asInfo(String name) { return new ExposedMetricsInfoImpl(name, ""); } + + protected Span createNewSpan(long traceid, long parentid, long spanid, String description, + long startTime, long endTime, String processid, String... tags) { + + Span span = new MilliSpan.Builder() + .description(description) + .traceId(traceid) + .parents(new long[] {parentid}) + .spanId(spanid) + .processId(processid) + .begin(startTime) + .end(endTime) + .build(); + + int tagCount = 0; + for(String annotation : tags) { + span.addKVAnnotation((Integer.toString(tagCount++)).getBytes(), annotation.getBytes()); + } + return span; + } + + + private static class CountDownConnection extends DelegateConnection { + private CountDownLatch commit; + + public CountDownConnection(Connection conn, CountDownLatch commit) { + super(conn); + this.commit = commit; + } + + @Override + public void commit() throws SQLException { + super.commit(); + commit.countDown(); + } + + } + + protected class TestTraceWriter extends TraceWriter { + + public TestTraceWriter(String tableName, int numThreads, int batchSize) { + super(tableName, numThreads, batchSize); + } + + @Override + protected Connection getConnection(String tableName) { + try { + Connection connection = new CountDownConnection(getConnectionWithoutTracing(), latch); + if(!traceTableExists(connection, tableName)) { + createTable(connection, tableName); + } + return connection; + } catch (SQLException e) { + e.printStackTrace(); + } + return null; + } + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c157ec9/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java deleted file mode 100644 index 875717c..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/trace/DisableableMetricsWriter.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.trace; - -import org.apache.commons.configuration.SubsetConfiguration; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.MetricsRecord; -import org.apache.hadoop.metrics2.MetricsSink; - -import java.sql.SQLException; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * - */ -public class DisableableMetricsWriter implements MetricsSink { - - private static final Log LOG = LogFactory.getLog(DisableableMetricsWriter.class); - private PhoenixMetricsSink writer; - private AtomicBoolean disabled = new AtomicBoolean(false); - - public DisableableMetricsWriter(PhoenixMetricsSink writer) { - this.writer = writer; - } - - @Override - public void init(SubsetConfiguration config) { - if (this.disabled.get()) return; - writer.init(config); - } - - @Override - public void flush() { - if (this.disabled.get()) { - clear(); - return; - } - writer.flush(); - - } - - @Override - public void putMetrics(MetricsRecord record) { - if (this.disabled.get()) return; - writer.putMetrics(record); - } - - public void disable() { - this.disabled.set(true); - } - - public void enable() { - this.disabled.set(false); - } - - public void clear() { - // clear any pending writes - try { - writer.clearForTesting(); - } catch (SQLException e) { - LOG.error("Couldn't clear the delgate writer when flush called and disabled", e); - } - } - - public PhoenixMetricsSink getDelegate() { - return this.writer; - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c157ec9/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java index dbb34ba..88ab6ff 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTableMetricsWriterIT.java @@ -18,12 +18,15 @@ package org.apache.phoenix.trace; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.sql.Connection; import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; -import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.htrace.Span; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.trace.TraceReader.SpanInfo; import org.apache.phoenix.trace.TraceReader.TraceHolder; @@ -34,6 +37,7 @@ import org.junit.Test; */ public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT { + private TestTraceWriter testTraceWriter; /** * IT should create the target table if it hasn't been created yet, but not fail if the table * has already been created @@ -41,10 +45,9 @@ public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT { */ @Test public void testCreatesTable() throws Exception { - PhoenixMetricsSink sink = new PhoenixMetricsSink(); + + testTraceWriter = new TestTraceWriter(generateUniqueName(), defaultTracingThreadPoolForTest, defaultTracingBatchSizeForTest); Connection conn = getConnectionWithoutTracing(); - String tableName = generateUniqueName(); - sink.initForTesting(conn, tableName); // check for existence of the tracing table try { @@ -56,26 +59,21 @@ public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT { // expected } - // initialize sink again, which should attempt to create the table, but not fail - try { - sink.initForTesting(conn, tableName); - } catch (Exception e) { - fail("Initialization shouldn't fail if table already exists!"); - } } /** * Simple metrics writing and reading check, that uses the standard wrapping in the - * {@link PhoenixMetricsSink} + * {@link TraceWriter} * @throws Exception on failure */ @Test public void writeMetrics() throws Exception { - // hook up a phoenix sink - PhoenixMetricsSink sink = new PhoenixMetricsSink(); + Connection conn = getConnectionWithoutTracing(); String tableName = generateUniqueName(); - sink.initForTesting(conn, tableName); + TraceSpanReceiver traceSpanReceiver = new TraceSpanReceiver(); + latch = new CountDownLatch(1); + testTraceWriter = new TestTraceWriter(tableName, defaultTracingThreadPoolForTest, defaultTracingBatchSizeForTest); // create a simple metrics record long traceid = 987654; @@ -84,15 +82,14 @@ public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT { long parentid = 11; long startTime = 12; long endTime = 13; + String processid = "Some process"; String annotation = "test annotation for a span"; - String hostnameValue = "host-name.value"; - MetricsRecord record = - createRecord(traceid, parentid, spanid, description, startTime, endTime, - hostnameValue, annotation); - // actually write the record to the table - sink.putMetrics(record); - sink.flush(); + Span span = createNewSpan(traceid, parentid, spanid, description, startTime, endTime, + processid, annotation); + + traceSpanReceiver.getSpanQueue().add(span); + assertTrue("Span never committed to table", latch.await(30, TimeUnit.SECONDS)); // make sure we only get expected stat entry (matcing the trace id), otherwise we could the // stats for the update as well @@ -111,8 +108,8 @@ public class PhoenixTableMetricsWriterIT extends BaseTracingTestIT { assertEquals(parentid, spanInfo.getParentIdForTesting()); assertEquals(startTime, spanInfo.start); assertEquals(endTime, spanInfo.end); - assertEquals(hostnameValue, spanInfo.hostname); assertEquals("Wrong number of tags", 0, spanInfo.tagCount); assertEquals("Wrong number of annotations", 1, spanInfo.annotationCount); } -} \ No newline at end of file + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c157ec9/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTraceReaderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTraceReaderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTraceReaderIT.java deleted file mode 100644 index 723810f..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTraceReaderIT.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.trace; - -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.AbstractMetric; -import org.apache.hadoop.metrics2.MetricsRecord; -import org.apache.hadoop.metrics2.MetricsTag; -import org.apache.htrace.Span; -import org.apache.phoenix.metrics.MetricInfo; -import org.apache.phoenix.trace.TraceReader.SpanInfo; -import org.apache.phoenix.trace.TraceReader.TraceHolder; -import org.junit.Test; - -/** - * Test that the {@link TraceReader} will correctly read traces written by the - * {@link org.apache.phoenix.trace.PhoenixMetricsSink} - */ - -public class PhoenixTraceReaderIT extends BaseTracingTestIT { - - private static final Log LOG = LogFactory.getLog(PhoenixTraceReaderIT.class); - - @Test - public void singleSpan() throws Exception { - PhoenixMetricsSink sink = new PhoenixMetricsSink(); - Properties props = new Properties(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - String tableName = generateUniqueName(); - sink.initForTesting(conn, tableName); - - // create a simple metrics record - long traceid = 987654; - MetricsRecord record = - createAndFlush(sink, traceid, Span.ROOT_SPAN_ID, 10, "root", 12, 13, - "host-name.value", "test annotation for a span"); - - // start a reader - validateTraces(Collections.singletonList(record), conn, traceid, tableName); - } - - private MetricsRecord createAndFlush(PhoenixMetricsSink sink, long traceid, - long parentid, long spanid, String desc, long startTime, long endTime, String hostname, - String... tags) { - MetricsRecord record = - createRecord(traceid, parentid, spanid, desc, startTime, endTime, hostname, tags); - sink.putMetrics(record); - sink.flush(); - return record; - } - - /** - * Test multiple spans, within the same trace. Some spans are independent of the parent span, - * some are child spans - * @throws Exception on failure - */ - @Test - public void testMultipleSpans() throws Exception { - // hook up a phoenix sink - PhoenixMetricsSink sink = new PhoenixMetricsSink(); - Connection conn = getConnectionWithoutTracing(); - String tableName = generateUniqueName(); - sink.initForTesting(conn, tableName); - - // create a simple metrics record - long traceid = 12345; - List<MetricsRecord> records = new ArrayList<MetricsRecord>(); - MetricsRecord record = - createAndFlush(sink, traceid, Span.ROOT_SPAN_ID, 7777, "root", 10, 30, - "hostname.value", "root-span tag"); - records.add(record); - - // then create a child record - record = - createAndFlush(sink, traceid, 7777, 6666, "c1", 11, 15, "hostname.value", - "first child"); - records.add(record); - - // create a different child - record = - createAndFlush(sink, traceid, 7777, 5555, "c2", 11, 18, "hostname.value", - "second child"); - records.add(record); - - // create a child of the second child - record = - createAndFlush(sink, traceid, 5555, 4444, "c3", 12, 16, "hostname.value", - "third child"); - records.add(record); - - // flush all the values to the table - sink.flush(); - - // start a reader - validateTraces(records, conn, traceid, tableName); - } - - private void validateTraces(List<MetricsRecord> records, Connection conn, long traceid, String tableName) - throws Exception { - TraceReader reader = new TraceReader(conn, tableName); - Collection<TraceHolder> traces = reader.readAll(1); - assertEquals("Got an unexpected number of traces!", 1, traces.size()); - // make sure the trace matches what we wrote - TraceHolder trace = traces.iterator().next(); - assertEquals("Got an unexpected traceid", traceid, trace.traceid); - assertEquals("Got an unexpected number of spans", records.size(), trace.spans.size()); - - validateTrace(records, trace); - } - - /** - * @param records - * @param trace - */ - private void validateTrace(List<MetricsRecord> records, TraceHolder trace) { - // drop each span into a sorted list so we get the expected ordering - Iterator<SpanInfo> spanIter = trace.spans.iterator(); - for (MetricsRecord record : records) { - SpanInfo spanInfo = spanIter.next(); - LOG.info("Checking span:\n" + spanInfo); - Iterator<AbstractMetric> metricIter = record.metrics().iterator(); - assertEquals("Got an unexpected span id", metricIter.next().value(), spanInfo.id); - long parentId = (Long) metricIter.next().value(); - if (parentId == Span.ROOT_SPAN_ID) { - assertNull("Got a parent, but it was a root span!", spanInfo.parent); - } else { - assertEquals("Got an unexpected parent span id", parentId, spanInfo.parent.id); - } - assertEquals("Got an unexpected start time", metricIter.next().value(), spanInfo.start); - assertEquals("Got an unexpected end time", metricIter.next().value(), spanInfo.end); - - Iterator<MetricsTag> tags = record.tags().iterator(); - - int annotationCount = 0; - while (tags.hasNext()) { - // hostname is a tag, so we differentiate it - MetricsTag tag = tags.next(); - if (tag.name().equals(MetricInfo.HOSTNAME.traceName)) { - assertEquals("Didn't store correct hostname value", tag.value(), - spanInfo.hostname); - } else { - int count = annotationCount++; - assertEquals("Didn't get expected annotation", count + " - " + tag.value(), - spanInfo.annotations.get(count)); - } - } - assertEquals("Didn't get expected number of annotations", annotationCount, - spanInfo.annotationCount); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c157ec9/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java index 5e05fe8..4477fa5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/trace/PhoenixTracingEndToEndIT.java @@ -18,9 +18,8 @@ package org.apache.phoenix.trace; import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.*; import java.sql.Connection; import java.sql.DriverManager; @@ -28,13 +27,13 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.Collection; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.htrace.Sampler; import org.apache.htrace.Span; import org.apache.htrace.SpanReceiver; @@ -43,11 +42,12 @@ import org.apache.htrace.TraceScope; import org.apache.htrace.impl.ProbabilitySampler; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.metrics.Metrics; import org.apache.phoenix.trace.TraceReader.SpanInfo; import org.apache.phoenix.trace.TraceReader.TraceHolder; +import org.apache.phoenix.trace.util.Tracing; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import com.google.common.collect.ImmutableMap; @@ -56,52 +56,26 @@ import com.google.common.collect.ImmutableMap; * Test that the logging sink stores the expected metrics/stats */ -// Marking this class as abstract till PHOENIX-3062 is fixed. -// FIXME: PHOENIX-3062 -public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT { +public class PhoenixTracingEndToEndIT extends BaseTracingTestIT { private static final Log LOG = LogFactory.getLog(PhoenixTracingEndToEndIT.class); private static final int MAX_RETRIES = 10; private String enabledForLoggingTable; private String enableForLoggingIndex; - - private DisableableMetricsWriter sink; private String tracingTableName; + private TestTraceWriter testTraceWriter = null; @Before public void setupMetrics() throws Exception { - PhoenixMetricsSink pWriter = new PhoenixMetricsSink(); - Connection conn = getConnectionWithoutTracing(); tracingTableName = "TRACING_" + generateUniqueName(); - pWriter.initForTesting(conn, tracingTableName); - sink = new DisableableMetricsWriter(pWriter); enabledForLoggingTable = "ENABLED_FOR_LOGGING_" + generateUniqueName(); enableForLoggingIndex = "ENABALED_FOR_LOGGING_INDEX_" + generateUniqueName(); - - TracingTestUtil.registerSink(sink, tracingTableName); } @After - public void cleanup() { - sink.disable(); - sink.clear(); - TracingTestUtil.unregisterSink(tracingTableName); - } - - private void waitForCommit(CountDownLatch latch) throws SQLException { - Connection conn = new CountDownConnection(getConnectionWithoutTracing(), latch); - replaceWriterConnection(conn); - } - - private void replaceWriterConnection(Connection conn) throws SQLException { - // disable the writer - sink.disable(); - - // swap the connection for one that listens - sink.getDelegate().initForTesting(conn, tracingTableName); - - // enable the writer - sink.enable(); + public void cleanUp() { + if(testTraceWriter != null) + testTraceWriter.stop(); } /** @@ -110,15 +84,11 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT { */ @Test public void testWriteSpans() throws Exception { - // get a receiver for the spans - SpanReceiver receiver = new TraceMetricSource(); - // which also needs to a source for the metrics system - Metrics.initialize().register("testWriteSpans-source", "source for testWriteSpans", - (MetricsSource) receiver); // watch our sink so we know when commits happen - CountDownLatch latch = new CountDownLatch(1); - waitForCommit(latch); + latch = new CountDownLatch(1); + + testTraceWriter = new TestTraceWriter(tracingTableName, defaultTracingThreadPoolForTest, defaultTracingBatchSizeForTest); // write some spans TraceScope trace = Trace.startSpan("Start write test", Sampler.ALWAYS); @@ -136,7 +106,7 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT { trace.close(); // pass the trace on - receiver.receiveSpan(span); + Tracing.getTraceSpanReceiver().receiveSpan(span); // wait for the tracer to actually do the write assertTrue("Sink not flushed. commit() not called on the connection", latch.await(60, TimeUnit.SECONDS)); @@ -176,10 +146,12 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT { * @throws Exception */ @Test + @Ignore public void testClientServerIndexingTracing() throws Exception { + // one call for client side, one call for server side - final CountDownLatch updated = new CountDownLatch(2); - waitForCommit(updated); + latch = new CountDownLatch(2); + testTraceWriter = new TestTraceWriter(tracingTableName, defaultTracingThreadPoolForTest, defaultTracingBatchSizeForTest); // separate connection so we don't create extra traces Connection conn = getConnectionWithoutTracing(); @@ -203,7 +175,7 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT { // wait for the latch to countdown, as the metrics system is time-based LOG.debug("Waiting for latch to complete!"); - updated.await(200, TimeUnit.SECONDS);// should be way more than GC pauses + latch.await(200, TimeUnit.SECONDS);// should be way more than GC pauses // read the traces back out @@ -259,8 +231,8 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT { Connection conn = getConnectionWithoutTracing(); // one call for client side, one call for server side - CountDownLatch updated = new CountDownLatch(2); - waitForCommit(updated); + latch = new CountDownLatch(2); + testTraceWriter = new TestTraceWriter(tracingTableName, defaultTracingThreadPoolForTest, defaultTracingBatchSizeForTest); // create a dummy table createTestTable(conn, false); @@ -289,7 +261,7 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT { assertTrue("Didn't get second result", results.next()); results.close(); - assertTrue("Get expected updates to trace table", updated.await(200, TimeUnit.SECONDS)); + assertTrue("Get expected updates to trace table", latch.await(200, TimeUnit.SECONDS)); // don't trace reads either boolean tracingComplete = checkStoredTraces(conn, new TraceChecker(){ @@ -309,8 +281,8 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT { Connection conn = getConnectionWithoutTracing(); // one call for client side, one call for server side - CountDownLatch updated = new CountDownLatch(2); - waitForCommit(updated); + latch = new CountDownLatch(5); + testTraceWriter = new TestTraceWriter(tracingTableName, defaultTracingThreadPoolForTest, defaultTracingBatchSizeForTest); // create a dummy table createTestTable(conn, false); @@ -338,7 +310,8 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT { assertEquals("Didn't get the expected number of row", 2, results.getInt(1)); results.close(); - assertTrue("Didn't get expected updates to trace table", updated.await(60, TimeUnit.SECONDS)); + assertTrue("Didn't get expected updates to trace table", latch.await(60, TimeUnit.SECONDS)); + // don't trace reads either boolean found = checkStoredTraces(conn, new TraceChecker() { @Override @@ -360,8 +333,8 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT { Connection conn = getConnectionWithoutTracing(); // one call for client side, one call for server side - CountDownLatch updated = new CountDownLatch(2); - waitForCommit(updated); + latch = new CountDownLatch(2); + testTraceWriter = new TestTraceWriter(tracingTableName, defaultTracingThreadPoolForTest, defaultTracingBatchSizeForTest); // create a dummy table createTestTable(conn, false); @@ -390,7 +363,7 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT { assertTrue("Didn't get second result", results.next()); results.close(); - assertTrue("Get expected updates to trace table", updated.await(200, TimeUnit.SECONDS)); + assertTrue("Get expected updates to trace table", latch.await(200, TimeUnit.SECONDS)); assertAnnotationPresent(customAnnotationKey, customAnnotationValue, conn); assertAnnotationPresent(TENANT_ID_ATTRIB, tenantId, conn); @@ -399,12 +372,12 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT { @Test public void testTraceOnOrOff() throws Exception { - Connection conn1 = DriverManager.getConnection(getUrl()); + Connection conn1 = getConnectionWithoutTracing(); //DriverManager.getConnection(getUrl()); try{ Statement statement = conn1.createStatement(); ResultSet rs = statement.executeQuery("TRACE ON"); assertTrue(rs.next()); - PhoenixConnection pconn= (PhoenixConnection) conn1; + PhoenixConnection pconn = (PhoenixConnection) conn1; long traceId = pconn.getTraceScope().getSpan().getTraceId(); assertEquals(traceId, rs.getLong(1)); assertEquals(traceId, rs.getLong("trace_id")); @@ -444,16 +417,131 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT { rs = statement.executeQuery("TRACE OFF"); assertFalse(rs.next()); + } finally { conn1.close(); } } + @Test + public void testSingleSpan() throws Exception { + + Properties props = new Properties(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + String tableName = generateUniqueName(); + latch = new CountDownLatch(1); + testTraceWriter = new TestTraceWriter(tableName, defaultTracingThreadPoolForTest, defaultTracingBatchSizeForTest); + + // create a simple metrics record + long traceid = 987654; + Span span = createNewSpan(traceid, Span.ROOT_SPAN_ID, 10, "root", 12, 13, "Some process", "test annotation for a span"); + + Tracing.getTraceSpanReceiver().receiveSpan(span); + assertTrue("Updates not written in table", latch.await(60, TimeUnit.SECONDS)); + + // start a reader + validateTraces(Collections.singletonList(span), conn, traceid, tableName); + } + + /** + * Test multiple spans, within the same trace. Some spans are independent of the parent span, + * some are child spans + * @throws Exception on failure + */ + @Test + public void testMultipleSpans() throws Exception { + + Connection conn = getConnectionWithoutTracing(); + String tableName = generateUniqueName(); + latch = new CountDownLatch(4); + testTraceWriter = new TestTraceWriter(tableName, defaultTracingThreadPoolForTest, defaultTracingBatchSizeForTest); + + // create a simple metrics record + long traceid = 12345; + List<Span> spans = new ArrayList<Span>(); + + Span span = + createNewSpan(traceid, Span.ROOT_SPAN_ID, 7777, "root", 10, 30, + "root process", "root-span tag"); + spans.add(span); + + // then create a child record + span = + createNewSpan(traceid, 7777, 6666, "c1", 11, 15, "c1 process", + "first child"); + spans.add(span); + + // create a different child + span = + createNewSpan(traceid, 7777, 5555, "c2", 11, 18, "c2 process", + "second child"); + spans.add(span); + + // create a child of the second child + span = + createNewSpan(traceid, 5555, 4444, "c3", 12, 16, "c3 process", + "third child"); + spans.add(span); + + for(Span span1 : spans) + Tracing.getTraceSpanReceiver().receiveSpan(span1); + + assertTrue("Updates not written in table", latch.await(100, TimeUnit.SECONDS)); + + // start a reader + validateTraces(spans, conn, traceid, tableName); + } + + private void validateTraces(List<Span> spans, Connection conn, long traceid, String tableName) + throws Exception { + TraceReader reader = new TraceReader(conn, tableName); + Collection<TraceHolder> traces = reader.readAll(1); + assertEquals("Got an unexpected number of traces!", 1, traces.size()); + // make sure the trace matches what we wrote + TraceHolder trace = traces.iterator().next(); + assertEquals("Got an unexpected traceid", traceid, trace.traceid); + assertEquals("Got an unexpected number of spans", spans.size(), trace.spans.size()); + + validateTrace(spans, trace); + } + + /** + * @param spans + * @param trace + */ + private void validateTrace(List<Span> spans, TraceHolder trace) { + // drop each span into a sorted list so we get the expected ordering + Iterator<SpanInfo> spanIter = trace.spans.iterator(); + for (Span span : spans) { + SpanInfo spanInfo = spanIter.next(); + LOG.info("Checking span:\n" + spanInfo); + + long parentId = span.getParentId(); + if(parentId == Span.ROOT_SPAN_ID) { + assertNull("Got a parent, but it was a root span!", spanInfo.parent); + } else { + assertEquals("Got an unexpected parent span id", parentId, spanInfo.parent.id); + } + + assertEquals("Got an unexpected start time", span.getStartTimeMillis(), spanInfo.start); + assertEquals("Got an unexpected end time", span.getStopTimeMillis(), spanInfo.end); + + int annotationCount = 0; + for(Map.Entry<byte[], byte[]> entry : span.getKVAnnotations().entrySet()) { + int count = annotationCount++; + assertEquals("Didn't get expected annotation", count + " - " + Bytes.toString(entry.getValue()), + spanInfo.annotations.get(count)); + } + assertEquals("Didn't get expected number of annotations", annotationCount, + spanInfo.annotationCount); + } + } + private void assertAnnotationPresent(final String annotationKey, final String annotationValue, Connection conn) throws Exception { boolean tracingComplete = checkStoredTraces(conn, new TraceChecker(){ @Override public boolean foundTrace(TraceHolder currentTrace) { - return currentTrace.toString().contains(annotationKey + " - " + annotationValue); + return currentTrace.toString().contains(annotationKey + " - " + annotationValue); } }); @@ -496,19 +584,4 @@ public abstract class PhoenixTracingEndToEndIT extends BaseTracingTestIT { } } - private static class CountDownConnection extends DelegateConnection { - private CountDownLatch commit; - - public CountDownConnection(Connection conn, CountDownLatch commit) { - super(conn); - this.commit = commit; - } - - @Override - public void commit() throws SQLException { - commit.countDown(); - super.commit(); - } - - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c157ec9/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 81d05bd..331b596 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -155,6 +155,10 @@ public interface QueryServices extends SQLCloseable { public static final String TRACING_PROBABILITY_THRESHOLD_ATTRIB = "phoenix.trace.probability.threshold"; public static final String TRACING_STATS_TABLE_NAME_ATTRIB = "phoenix.trace.statsTableName"; public static final String TRACING_CUSTOM_ANNOTATION_ATTRIB_PREFIX = "phoenix.trace.custom.annotation."; + public static final String TRACING_ENABLED = "phoenix.trace.enabled"; + public static final String TRACING_BATCH_SIZE = "phoenix.trace.batchSize"; + public static final String TRACING_THREAD_POOL_SIZE = "phoenix.trace.threadPoolSize"; + public static final String TRACING_TRACE_BUFFER_SIZE = "phoenix.trace.traceBufferSize"; public static final String USE_REVERSE_SCAN_ATTRIB = "phoenix.query.useReverseScan"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c157ec9/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 35eda60..b1d8a7e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -82,6 +82,11 @@ import static org.apache.phoenix.query.QueryServices.TRANSACTIONS_ENABLED; import static org.apache.phoenix.query.QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING; import static org.apache.phoenix.query.QueryServices.USE_BYTE_BASED_REGEX_ATTRIB; import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB; +import static org.apache.phoenix.query.QueryServices.TRACING_ENABLED; +import static org.apache.phoenix.query.QueryServices.TRACING_STATS_TABLE_NAME_ATTRIB; +import static org.apache.phoenix.query.QueryServices.TRACING_BATCH_SIZE; +import static org.apache.phoenix.query.QueryServices.TRACING_THREAD_POOL_SIZE; +import static org.apache.phoenix.query.QueryServices.TRACING_TRACE_BUFFER_SIZE; import java.util.HashSet; import java.util.Map.Entry; @@ -129,6 +134,10 @@ public class QueryServicesOptions { public static final boolean DEFAULT_IMMUTABLE_ROWS = false; // Tables rows may be updated public static final boolean DEFAULT_DROP_METADATA = true; // Drop meta data also. public static final long DEFAULT_DRIVER_SHUTDOWN_TIMEOUT_MS = 5 * 1000; // Time to wait in ShutdownHook to exit gracefully. + public static final boolean DEFAULT_TRACING_ENABLED = false; + public static final int DEFAULT_TRACING_THREAD_POOL_SIZE = 5; + public static final int DEFAULT_TRACING_BATCH_SIZE = 100; + public static final int DEFAULT_TRACING_TRACE_BUFFER_SIZE = 1000; @Deprecated //use DEFAULT_MUTATE_BATCH_SIZE_BYTES public final static int DEFAULT_MUTATE_BATCH_SIZE = 100; // Batch size for UPSERT SELECT and DELETE @@ -227,12 +236,12 @@ public class QueryServicesOptions { public static final boolean DEFAULT_TABLE_ISTRANSACTIONAL = false; public static final boolean DEFAULT_TRANSACTIONS_ENABLED = false; public static final boolean DEFAULT_IS_GLOBAL_METRICS_ENABLED = true; - + public static final boolean DEFAULT_TRANSACTIONAL = false; public static final boolean DEFAULT_AUTO_FLUSH = false; private static final String DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY = ClientRpcControllerFactory.class.getName(); - + public static final String DEFAULT_CONSISTENCY_LEVEL = Consistency.STRONG.toString(); public static final boolean DEFAULT_USE_BYTE_BASED_REGEX = false; @@ -241,10 +250,10 @@ public class QueryServicesOptions { public static final boolean DEFAULT_REQUEST_LEVEL_METRICS_ENABLED = false; public static final boolean DEFAULT_ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE = true; public static final int DEFAULT_MAX_VERSIONS_TRANSACTIONAL = Integer.MAX_VALUE; - + public static final boolean DEFAULT_RETURN_SEQUENCE_VALUES = false; public static final String DEFAULT_EXTRA_JDBC_ARGUMENTS = ""; - + public static final long DEFAULT_INDEX_POPULATION_SLEEP_TIME = 5000; // QueryServer defaults -- ensure ThinClientUtil is also updated since phoenix-queryserver-client @@ -264,9 +273,9 @@ public class QueryServicesOptions { public static final int DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE = 10; public static final boolean DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE = true; public static final float DEFAULT_LIMITED_QUERY_SERIAL_THRESHOLD = 0.2f; - + public static final boolean DEFAULT_INDEX_ASYNC_BUILD_ENABLED = true; - + public static final String DEFAULT_CLIENT_CACHE_ENCODING = PTableRefFactory.Encoding.OBJECT.toString(); public static final boolean DEFAULT_AUTO_UPGRADE_ENABLED = true; public static final int DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION = 86400000; @@ -361,7 +370,10 @@ public class QueryServicesOptions { .setIfUnset(IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, DEFAULT_IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE) .setIfUnset(LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE) .setIfUnset(AUTO_UPGRADE_ENABLED, DEFAULT_AUTO_UPGRADE_ENABLED) - .setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING); + .setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING) + .setIfUnset(TRACING_ENABLED, DEFAULT_TRACING_ENABLED) + .setIfUnset(TRACING_BATCH_SIZE, DEFAULT_TRACING_BATCH_SIZE) + .setIfUnset(TRACING_THREAD_POOL_SIZE, DEFAULT_TRACING_THREAD_POOL_SIZE); // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set // it to 1, so we'll change it. @@ -556,7 +568,33 @@ public class QueryServicesOptions { public int getSpillableGroupByNumSpillFiles() { return config.getInt(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES); } - + + public boolean isTracingEnabled() { + return config.getBoolean(TRACING_ENABLED, DEFAULT_TRACING_ENABLED); + } + + public QueryServicesOptions setTracingEnabled(boolean enable) { + config.setBoolean(TRACING_ENABLED, enable); + return this; + } + + public int getTracingThreadPoolSize() { + return config.getInt(TRACING_THREAD_POOL_SIZE, DEFAULT_TRACING_THREAD_POOL_SIZE); + } + + public int getTracingBatchSize() { + return config.getInt(TRACING_BATCH_SIZE, DEFAULT_TRACING_BATCH_SIZE); + } + + public int getTracingTraceBufferSize() { + return config.getInt(TRACING_TRACE_BUFFER_SIZE, DEFAULT_TRACING_TRACE_BUFFER_SIZE); + } + + public String getTableName() { + return config.get(TRACING_STATS_TABLE_NAME_ATTRIB, DEFAULT_TRACING_STATS_TABLE_NAME); + } + + public boolean isGlobalMetricsEnabled() { return config.getBoolean(GLOBAL_METRICS_ENABLED, DEFAULT_IS_GLOBAL_METRICS_ENABLED); } @@ -647,9 +685,9 @@ public class QueryServicesOptions { public QueryServicesOptions setDelayInMillisForSchemaChangeCheck(long delayInMillis) { config.setLong(DELAY_FOR_SCHEMA_UPDATE_CHECK, delayInMillis); return this; - + } - + public QueryServicesOptions setUseByteBasedRegex(boolean flag) { config.setBoolean(USE_BYTE_BASED_REGEX_ATTRIB, flag); return this; @@ -659,7 +697,7 @@ public class QueryServicesOptions { config.setBoolean(FORCE_ROW_KEY_ORDER_ATTRIB, forceRowKeyOrder); return this; } - + public QueryServicesOptions setExtraJDBCArguments(String extraArgs) { config.set(EXTRA_JDBC_ARGUMENTS_ATTRIB, extraArgs); return this; @@ -674,40 +712,40 @@ public class QueryServicesOptions { config.setBoolean(COMMIT_STATS_ASYNC, flag); return this; } - + public QueryServicesOptions setEnableRenewLease(boolean enable) { config.setBoolean(RENEW_LEASE_ENABLED, enable); return this; } - + public QueryServicesOptions setIndexHandlerCount(int count) { config.setInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, count); return this; } - + public QueryServicesOptions setMetadataHandlerCount(int count) { config.setInt(QueryServices.METADATA_HANDLER_COUNT_ATTRIB, count); return this; } - + public QueryServicesOptions setHConnectionPoolCoreSize(int count) { config.setInt(QueryServices.HCONNECTION_POOL_CORE_SIZE, count); return this; } - + public QueryServicesOptions setHConnectionPoolMaxSize(int count) { config.setInt(QueryServices.HCONNECTION_POOL_MAX_SIZE, count); return this; } - + public QueryServicesOptions setMaxThreadsPerHTable(int count) { config.setInt(QueryServices.HTABLE_MAX_THREADS, count); return this; } - + public QueryServicesOptions setDefaultIndexPopulationWaitTime(long waitTime) { config.setLong(INDEX_POPULATION_SLEEP_TIME, waitTime); return this; } - + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c157ec9/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceMetricSource.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceMetricSource.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceMetricSource.java deleted file mode 100644 index e92dd6a..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceMetricSource.java +++ /dev/null @@ -1,183 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.trace; - -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.metrics2.*; -import org.apache.hadoop.metrics2.lib.Interns; -import org.apache.phoenix.metrics.MetricInfo; -import org.apache.phoenix.metrics.Metrics; -import org.apache.htrace.HTraceConfiguration; -import org.apache.htrace.Span; -import org.apache.htrace.SpanReceiver; -import org.apache.htrace.TimelineAnnotation; -import org.apache.htrace.impl.MilliSpan; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import static org.apache.phoenix.metrics.MetricInfo.*; - -/** - * Sink for request traces ({@link SpanReceiver}) that pushes writes to {@link MetricsSource} in a - * format that we can more easily consume. - * <p> - * <p> - * Rather than write directly to a phoenix table, we drop it into the metrics queue so we can more - * cleanly handle it asyncrhonously.Currently, {@link MilliSpan} submits the span in a synchronized - * block to all the receivers, which could have a lot of overhead if we are submitting to multiple - * receivers. - * <p> - * The format of the generated metrics is this: - * <ol> - * <li>All Metrics from the same span have the same name (allowing correlation in the sink)</li> - * <li>The description of the metric describes what it contains. For instance, - * <ul> - * <li>{@link MetricInfo#PARENT} is the id of the parent of this span. (Root span is - * {@link Span#ROOT_SPAN_ID}).</li> - * <li>{@value MetricInfo#START} is the start time of the span</li> - * <li>{@value MetricInfo#END} is the end time of the span</li> - * </ul></li> - * <li>Each span's messages are contained in a {@link MetricsTag} with the same name as above and a - * generic counter for the number of messages (to differentiate messages and provide timeline - * ordering).</li> - * </ol> - * <p> - * <i>So why even submit to metrics2 framework if we only have a single source?</i> - * <p> - * This allows us to make the updates in batches. We might have spans that finish before other spans - * (for instance in the same parent). By batching the updates we can lessen the overhead on the - * client, which is also busy doing 'real' work. <br> - * We could make our own queue and manage batching and filtering and dropping extra metrics, but - * that starts to get complicated fast (its not as easy as it sounds) so we use metrics2 to abstract - * out that pipeline and also provides us flexibility to dump metrics to other sources. - * <p> - * This is a somewhat rough implementation - we do excessive locking for correctness, - * rather than trying to make it fast, for the moment. - */ -public class TraceMetricSource implements SpanReceiver, MetricsSource { - - private static final String EMPTY_STRING = ""; - - private static final String CONTEXT = "tracing"; - - private List<Metric> spans = new ArrayList<Metric>(); - - public TraceMetricSource() { - - MetricsSystem manager = Metrics.initialize(); - - // Register this instance. - // For right now, we ignore the MBean registration issues that show up in DEBUG logs. Basically, - // we need a Jmx MBean compliant name. We'll get to a better name when we want that later - manager.register(CONTEXT, "Phoenix call tracing", this); - } - - @Override - public void receiveSpan(Span span) { - Metric builder = new Metric(span); - // add all the metrics for the span - builder.addCounter(Interns.info(SPAN.traceName, EMPTY_STRING), span.getSpanId()); - builder.addCounter(Interns.info(PARENT.traceName, EMPTY_STRING), span.getParentId()); - builder.addCounter(Interns.info(START.traceName, EMPTY_STRING), span.getStartTimeMillis()); - builder.addCounter(Interns.info(END.traceName, EMPTY_STRING), span.getStopTimeMillis()); - // add the tags to the span. They were written in order received so we mark them as such - for (TimelineAnnotation ta : span.getTimelineAnnotations()) { - builder.add(new MetricsTag(Interns.info(TAG.traceName, Long.toString(ta.getTime())), ta - .getMessage())); - } - - // add the annotations. We assume they are serialized as strings and integers, but that can - // change in the future - Map<byte[], byte[]> annotations = span.getKVAnnotations(); - for (Entry<byte[], byte[]> annotation : annotations.entrySet()) { - Pair<String, String> val = - TracingUtils.readAnnotation(annotation.getKey(), annotation.getValue()); - builder.add(new MetricsTag(Interns.info(ANNOTATION.traceName, val.getFirst()), val - .getSecond())); - } - - // add the span to the list we care about - synchronized (this) { - spans.add(builder); - } - } - - @Override - public void getMetrics(MetricsCollector collector, boolean all) { - // add a marker record so we know how many spans are used - // this is also necessary to ensure that we register the metrics source as an MBean (avoiding a - // runtime warning) - MetricsRecordBuilder marker = collector.addRecord(TracingUtils.METRICS_MARKER_CONTEXT); - marker.add(new MetricsTag(new MetricsInfoImpl("stat", "num spans"), Integer - .toString(spans.size()))); - - // actually convert the known spans into metric records as well - synchronized (this) { - for (Metric span : spans) { - MetricsRecordBuilder builder = collector.addRecord(new MetricsInfoImpl(TracingUtils - .getTraceMetricName(span.id), span.desc)); - builder.setContext(TracingUtils.METRICS_CONTEXT); - for (Pair<MetricsInfo, Long> metric : span.counters) { - builder.addCounter(metric.getFirst(), metric.getSecond()); - } - for (MetricsTag tag : span.tags) { - builder.add(tag); - } - } - // reset the spans so we don't keep a big chunk of memory around - spans = new ArrayList<Metric>(); - } - } - - @Override - public void close() throws IOException { - // noop - } - - private static class Metric { - - List<Pair<MetricsInfo, Long>> counters = new ArrayList<Pair<MetricsInfo, Long>>(); - List<MetricsTag> tags = new ArrayList<MetricsTag>(); - private String id; - private String desc; - - public Metric(Span span) { - this.id = Long.toString(span.getTraceId()); - this.desc = span.getDescription(); - } - - /** - * @param metricsInfoImpl - * @param startTimeMillis - */ - public void addCounter(MetricsInfo metricsInfoImpl, long startTimeMillis) { - counters.add(new Pair<MetricsInfo, Long>(metricsInfoImpl, startTimeMillis)); - } - - /** - * @param metricsTag - */ - public void add(MetricsTag metricsTag) { - tags.add(metricsTag); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c157ec9/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java index 318453f..68b945c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java @@ -14,7 +14,8 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - */package org.apache.phoenix.trace; + */ +package org.apache.phoenix.trace; import java.sql.Connection; import java.sql.ResultSet; @@ -30,6 +31,7 @@ import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.htrace.Span; +import org.apache.htrace.Trace; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.metrics.MetricInfo; import org.apache.phoenix.query.QueryServices; @@ -40,7 +42,7 @@ import com.google.common.base.Joiner; import com.google.common.primitives.Longs; /** - * Read the traces written to phoenix tables by the {@link PhoenixMetricsSink}. + * Read the traces written to phoenix tables by the {@link TraceWriter}. */ public class TraceReader { @@ -54,8 +56,8 @@ public class TraceReader { comma.join(MetricInfo.TRACE.columnName, MetricInfo.PARENT.columnName, MetricInfo.SPAN.columnName, MetricInfo.DESCRIPTION.columnName, MetricInfo.START.columnName, MetricInfo.END.columnName, - MetricInfo.HOSTNAME.columnName, PhoenixMetricsSink.TAG_COUNT, - PhoenixMetricsSink.ANNOTATION_COUNT); + MetricInfo.HOSTNAME.columnName, TraceWriter.TAG_COUNT, + TraceWriter.ANNOTATION_COUNT); } private Connection conn; @@ -177,13 +179,13 @@ public class TraceReader { private Collection<? extends String> getTags(long traceid, long parent, long span, int count) throws SQLException { return getDynamicCountColumns(traceid, parent, span, count, - PhoenixMetricsSink.TAG_FAMILY, MetricInfo.TAG.columnName); + TraceWriter.TAG_FAMILY, MetricInfo.TAG.columnName); } private Collection<? extends String> getAnnotations(long traceid, long parent, long span, int count) throws SQLException { return getDynamicCountColumns(traceid, parent, span, count, - PhoenixMetricsSink.ANNOTATION_FAMILY, MetricInfo.ANNOTATION.columnName); + TraceWriter.ANNOTATION_FAMILY, MetricInfo.ANNOTATION.columnName); } private Collection<? extends String> getDynamicCountColumns(long traceid, long parent, @@ -195,7 +197,7 @@ public class TraceReader { // build the column strings, family.column<index> String[] parts = new String[count]; for (int i = 0; i < count; i++) { - parts[i] = PhoenixMetricsSink.getDynamicColumnName(family, columnName, i); + parts[i] = TraceWriter.getDynamicColumnName(family, columnName, i); } // join the columns together String columns = comma.join(parts); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c157ec9/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java new file mode 100644 index 0000000..3c71e27 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.trace; + +import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.htrace.Span; +import org.apache.htrace.SpanReceiver; +import org.apache.htrace.impl.MilliSpan; +import org.apache.phoenix.metrics.MetricInfo; +import org.apache.phoenix.query.QueryServicesOptions; + +/** + * Sink for request traces ({@link SpanReceiver}) that pushes writes to {@link TraceWriter} in a + * format that we can more easily consume. + * <p> + * <p> + * Rather than write directly to a phoenix table, we drop it into the metrics queue so we can more + * cleanly handle it asynchronously.Currently, {@link MilliSpan} submits the span in a synchronized + * block to all the receivers, which could have a lot of overhead if we are submitting to multiple + * receivers. + * <p> + * The format of the generated metrics is this: + * <ol> + * <li>All Metrics from the same span have the same trace id (allowing correlation in the sink)</li> + * <li>The description of the metric describes what it contains. For instance, + * <ul> + * <li>{@link MetricInfo#PARENT} is the id of the parent of this span. (Root span is + * {@link Span#ROOT_SPAN_ID}).</li> + * <li>{@link MetricInfo#START} is the start time of the span</li> + * <li>{@link MetricInfo#END} is the end time of the span</li> + * </ul> + * </li> + * </ol> + * <p> + * <i>So why even submit to {@link TraceWriter} if we only have a single source?</i> + * <p> + * This allows us to make the updates in batches. We might have spans that finish before other spans + * (for instance in the same parent). By batching the updates we can lessen the overhead on the + * client, which is also busy doing 'real' work. <br> + * This class is custom implementation of metrics queue and handles batch writes to the Phoenix Table + * via another thread. Batch size and number of threads are configurable. + * <p> + */ +public class TraceSpanReceiver implements SpanReceiver { + + private static final Log LOG = LogFactory.getLog(TraceSpanReceiver.class); + + private static final int CAPACITY = QueryServicesOptions.withDefaults().getTracingTraceBufferSize(); + + private static BlockingQueue<Span> spanQueue = null; + + public TraceSpanReceiver() { + this.spanQueue = new ArrayBlockingQueue<Span>(CAPACITY); + } + + @Override + public void receiveSpan(Span span) { + if (spanQueue.offer(span)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Span buffered to queue " + span.toJson()); + } + } else if (LOG.isDebugEnabled()) { + LOG.debug("Span NOT buffered due to overflow in queue " + span.toJson()); + } + } + + @Override + public void close() throws IOException { + // noop + } + + protected BlockingQueue<Span> getSpanQueue() { + return spanQueue; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c157ec9/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceWriter.java new file mode 100644 index 0000000..938baa2 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceWriter.java @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.trace; + +import static org.apache.phoenix.metrics.MetricInfo.ANNOTATION; +import static org.apache.phoenix.metrics.MetricInfo.DESCRIPTION; +import static org.apache.phoenix.metrics.MetricInfo.END; +import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME; +import static org.apache.phoenix.metrics.MetricInfo.PARENT; +import static org.apache.phoenix.metrics.MetricInfo.SPAN; +import static org.apache.phoenix.metrics.MetricInfo.START; +import static org.apache.phoenix.metrics.MetricInfo.TAG; +import static org.apache.phoenix.metrics.MetricInfo.TRACE; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.htrace.Span; +import org.apache.htrace.TimelineAnnotation; +import org.apache.phoenix.compile.MutationPlan; +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.jdbc.PhoenixPreparedStatement; +import org.apache.phoenix.metrics.MetricInfo; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.TableNotFoundException; +import org.apache.phoenix.trace.util.Tracing; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Sink for the trace spans pushed into the queue by {@link TraceSpanReceiver}. The class instantiates a thread pool + * of configurable size, which will pull the data from queue and write to the Phoenix Trace Table in batches. Various + * configuration options include thread pool size and batch commit size. + */ +public class TraceWriter { + private static final Log LOG = LogFactory.getLog(TraceWriter.class); + + private static final String VARIABLE_VALUE = "?"; + + private static final Joiner COLUMN_JOIN = Joiner.on("."); + static final String TAG_FAMILY = "tags"; + /** + * Count of the number of tags we are storing for this row + */ + static final String TAG_COUNT = COLUMN_JOIN.join(TAG_FAMILY, "count"); + + static final String ANNOTATION_FAMILY = "annotations"; + static final String ANNOTATION_COUNT = COLUMN_JOIN.join(ANNOTATION_FAMILY, "count"); + + /** + * Join strings on a comma + */ + private static final Joiner COMMAS = Joiner.on(','); + + private String tableName; + private int BATCH_SIZE; + private int NUM_THREADS; + + protected BlockingQueue<Span> spanQueue; + + private ScheduledExecutorService executor; + + public TraceWriter(String tableName, int numThreads, int batchSize) { + + this.BATCH_SIZE = batchSize; + this.NUM_THREADS = numThreads; + this.tableName = tableName; + + TraceSpanReceiver traceSpanReceiver = Tracing.getTraceSpanReceiver(); + spanQueue = traceSpanReceiver != null ? traceSpanReceiver.getSpanQueue() : null; + // TraceWriter should be instantiated only once, however when multiple JUnit Test runs continuously, each of them initialize their own class. + // To prevent them from interfering with each other, its safe to clear the queue. + if(spanQueue != null) + spanQueue.clear(); + ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); + builder.setDaemon(true).setNameFormat("PHOENIX-METRICS-WRITER"); + executor = Executors.newScheduledThreadPool(NUM_THREADS, builder.build()); + for (int i = 0; i < NUM_THREADS; i++) { + executor.scheduleAtFixedRate(new FlushMetrics(), 0, 10, TimeUnit.SECONDS); + } + LOG.info("Writing tracing metrics to phoenix table"); + } + + @VisibleForTesting + public void stop() { + try { + executor.awaitTermination(5, TimeUnit.SECONDS); + executor.shutdownNow(); + } catch (InterruptedException e) { + LOG.error("Failed to stop the thread. ", e); + } + } + + public class FlushMetrics implements Runnable { + + private Connection conn; + private int counter = 0; + + public FlushMetrics() { + conn = getConnection(tableName); + } + + @Override + public void run() { + + if(conn == null) return; + while (!spanQueue.isEmpty()) { + Span span = spanQueue.poll(); + if (null == span) break; + LOG.info("Span received: " + span.toJson()); + addToBatch(span); + counter++; + if (counter >= BATCH_SIZE) { + commitBatch(conn); + counter = 0; + } + } + } + + private void addToBatch(Span span) { + + String stmt = "UPSERT INTO " + tableName + " ("; + // drop it into the queue of things that should be written + List<String> keys = new ArrayList<String>(); + List<Object> values = new ArrayList<Object>(); + // we need to keep variable values in a separate set since they may have spaces, which + // causes the parser to barf. Instead, we need to add them after the statement is + // prepared + List<String> variableValues = new ArrayList<String>(); + keys.add(TRACE.columnName); + values.add(span.getTraceId()); + + keys.add(DESCRIPTION.columnName); + values.add(VARIABLE_VALUE); + variableValues.add(span.getDescription()); + + keys.add(SPAN.traceName); + values.add(span.getSpanId()); + + keys.add(PARENT.traceName); + values.add(span.getParentId()); + + keys.add(START.traceName); + values.add(span.getStartTimeMillis()); + + keys.add(END.traceName); + values.add(span.getStopTimeMillis()); + + int annotationCount = 0; + int tagCount = 0; + + // add the tags to the span. They were written in order received so we mark them as such + for (TimelineAnnotation ta : span.getTimelineAnnotations()) { + addDynamicEntry(keys, values, variableValues, TAG_FAMILY, Long.toString(ta.getTime()), ta.getMessage(), TAG, tagCount); + tagCount++; + } + + // add the annotations. We assume they are serialized as strings and integers, but that can + // change in the future + Map<byte[], byte[]> annotations = span.getKVAnnotations(); + for (Map.Entry<byte[], byte[]> annotation : annotations.entrySet()) { + Pair<String, String> val = + TracingUtils.readAnnotation(annotation.getKey(), annotation.getValue()); + addDynamicEntry(keys, values, variableValues, ANNOTATION_FAMILY, val.getFirst(), val.getSecond(), ANNOTATION, annotationCount); + annotationCount++; + } + + // add the tag count, now that we know it + keys.add(TAG_COUNT); + // ignore the hostname in the tags, if we know it + values.add(tagCount); + + keys.add(ANNOTATION_COUNT); + values.add(annotationCount); + + // compile the statement together + stmt += COMMAS.join(keys); + stmt += ") VALUES (" + COMMAS.join(values) + ")"; + + if (LOG.isTraceEnabled()) { + LOG.trace("Logging metrics to phoenix table via: " + stmt); + LOG.trace("With tags: " + variableValues); + } + try { + PreparedStatement ps = conn.prepareStatement(stmt); + // add everything that wouldn't/may not parse + int index = 1; + for (String tag : variableValues) { + ps.setString(index++, tag); + } + + // Not going through the standard route of using statement.execute() as that code path + // is blocked if the metadata hasn't been been upgraded to the new minor release. + MutationPlan plan = ps.unwrap(PhoenixPreparedStatement.class).compileMutation(stmt); + MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState(); + MutationState newState = plan.execute(); + state.join(newState); + } catch (SQLException e) { + LOG.error( + "Could not write metric: \n" + span + " to prepared statement:\n" + stmt, e); + } + } + } + + public static String getDynamicColumnName(String family, String column, int count) { + return COLUMN_JOIN.join(family, column) + count; + } + + private void addDynamicEntry(List<String> keys, List<Object> values, + List<String> variableValues, String family, String desc, String value, MetricInfo metric, + int count) { + // <family><.dynColumn><count> <VARCHAR> + keys.add(getDynamicColumnName(family, metric.columnName, count) + " VARCHAR"); + + // build the annotation value + String val = desc + " - " + value; + values.add(VARIABLE_VALUE); + variableValues.add(val); + } + + protected Connection getConnection(String tableName) { + + try { + // create the phoenix connection + Properties props = new Properties(); + props.setProperty(QueryServices.TRACING_FREQ_ATTRIB, Tracing.Frequency.NEVER.getKey()); + Configuration conf = HBaseConfiguration.create(); + Connection conn = QueryUtil.getConnectionOnServer(props, conf); + + if (!traceTableExists(conn, tableName)) { + createTable(conn, tableName); + } + + LOG.info( + "Created new connection for tracing " + conn.toString() + " Table: " + tableName); + return conn; + } catch (Exception e) { + LOG.error("New connection failed for tracing Table: " + tableName, e); + return null; + } + } + + protected boolean traceTableExists(Connection conn, String traceTableName) throws SQLException { + try { + PhoenixRuntime.getTable(conn, traceTableName); + return true; + } catch (TableNotFoundException e) { + return false; + } + } + + /** + * Create a stats table with the given name. Stores the name for use later when creating upsert + * statements + * @param conn connection to use when creating the table + * @param table name of the table to create + * @throws SQLException if any phoenix operations fails + */ + protected void createTable(Connection conn, String table) throws SQLException { + // only primary-key columns can be marked non-null + String ddl = + "create table if not exists " + table + "( " + TRACE.columnName + + " bigint not null, " + PARENT.columnName + " bigint not null, " + + SPAN.columnName + " bigint not null, " + DESCRIPTION.columnName + + " varchar, " + START.columnName + " bigint, " + END.columnName + + " bigint, " + HOSTNAME.columnName + " varchar, " + TAG_COUNT + + " smallint, " + ANNOTATION_COUNT + " smallint" + + " CONSTRAINT pk PRIMARY KEY (" + TRACE.columnName + ", " + + PARENT.columnName + ", " + SPAN.columnName + "))\n" + + // We have a config parameter that can be set so that tables are + // transactional by default. If that's set, we still don't want these system + // tables created as transactional tables, make these table non + // transactional + PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE; + PreparedStatement stmt = conn.prepareStatement(ddl); + stmt.execute(); + } + + protected void commitBatch(Connection conn) { + try { + conn.commit(); + } catch (SQLException e) { + LOG.error("Unable to commit traces on conn: " + conn.toString() + " to table: " + tableName, e); + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c157ec9/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java index c9add01..94a89b8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/util/Tracing.java @@ -36,7 +36,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.parse.TraceStatement; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.trace.TraceMetricSource; +import org.apache.phoenix.trace.TraceSpanReceiver; import org.apache.htrace.Sampler; import org.apache.htrace.Span; import org.apache.htrace.Trace; @@ -45,6 +45,7 @@ import org.apache.htrace.Tracer; import org.apache.htrace.impl.ProbabilitySampler; import org.apache.htrace.wrappers.TraceCallable; import org.apache.htrace.wrappers.TraceRunnable; +import org.apache.phoenix.trace.TraceWriter; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -258,6 +259,7 @@ public class Tracing { * Track if the tracing system has been initialized for phoenix */ private static boolean initialized = false; + private static TraceSpanReceiver traceSpanReceiver = null; /** * Add the phoenix span receiver so we can log the traces. We have a single trace source for the @@ -266,7 +268,12 @@ public class Tracing { public synchronized static void addTraceMetricsSource() { try { if (!initialized) { - Trace.addReceiver(new TraceMetricSource()); + traceSpanReceiver = new TraceSpanReceiver(); + Trace.addReceiver(traceSpanReceiver); + if(QueryServicesOptions.withDefaults().isTracingEnabled()) { + QueryServicesOptions options = QueryServicesOptions.withDefaults(); + new TraceWriter(options.getTableName(), options.getTracingThreadPoolSize(), options.getTracingBatchSize()); + } } } catch (RuntimeException e) { LOG.warn("Tracing will outputs will not be written to any metrics sink! No " @@ -281,6 +288,10 @@ public class Tracing { initialized = true; } + public static TraceSpanReceiver getTraceSpanReceiver() { + return traceSpanReceiver; + } + public static boolean isTraceOn(String traceOption) { Preconditions.checkArgument(traceOption != null); if(traceOption.equalsIgnoreCase("ON")) return true; http://git-wip-us.apache.org/repos/asf/phoenix/blob/7c157ec9/phoenix-core/src/test/java/org/apache/phoenix/trace/TraceMetricsSourceTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/trace/TraceMetricsSourceTest.java b/phoenix-core/src/test/java/org/apache/phoenix/trace/TraceMetricsSourceTest.java index eabcaca..7f307da 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/trace/TraceMetricsSourceTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/trace/TraceMetricsSourceTest.java @@ -20,24 +20,18 @@ package org.apache.phoenix.trace; import static org.junit.Assert.assertTrue; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.metrics2.MetricsCollector; -import org.apache.hadoop.metrics2.MetricsRecordBuilder; -import org.apache.hadoop.metrics2.MetricsTag; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.htrace.Span; import org.apache.htrace.impl.MilliSpan; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.Mockito; /** - * Test that the @{link TraceMetricSource} correctly handles different kinds of traces + * Test that the @{link TraceSpanReceiver} correctly handles different kinds of traces */ public class TraceMetricsSourceTest { @BeforeClass public static void setup() throws Exception{ - DefaultMetricsSystem.setMiniClusterMode(true); } /** @@ -48,16 +42,19 @@ public class TraceMetricsSourceTest { public void testNonIntegerAnnotations(){ Span span = getSpan(); // make sure its less than the length of an integer + byte[] value = Bytes.toBytes("a"); byte[] someInt = Bytes.toBytes(1); - assertTrue(someInt.length >value.length); + assertTrue(someInt.length > value.length); // an annotation that is not an integer span.addKVAnnotation(Bytes.toBytes("key"), value); // Create the sink and write the span - TraceMetricSource source = new TraceMetricSource(); + TraceSpanReceiver source = new TraceSpanReceiver(); source.receiveSpan(span); + + assertTrue(source.getSpanQueue().size() == 1); } @Test @@ -67,29 +64,13 @@ public class TraceMetricsSourceTest { // add annotation through the phoenix interfaces TracingUtils.addAnnotation(span, "message", 10); - TraceMetricSource source = new TraceMetricSource(); + TraceSpanReceiver source = new TraceSpanReceiver(); source.receiveSpan(span); - } - - /** - * If the source does not write any metrics when there are no spans, i.e. when initialized, - * then the metrics system will discard the source, so it needs to always emit some metrics. - */ - @Test - public void testWritesInfoWhenNoSpans(){ - TraceMetricSource source = new TraceMetricSource(); - MetricsCollector collector = Mockito.mock(MetricsCollector.class); - MetricsRecordBuilder builder = Mockito.mock(MetricsRecordBuilder.class); - Mockito.when(collector.addRecord(Mockito.anyString())).thenReturn(builder); - - source.getMetrics(collector, true); - // verify that we add a record and that the record has some info - Mockito.verify(collector).addRecord(Mockito.anyString()); - Mockito.verify(builder).add(Mockito.any(MetricsTag.class)); + assertTrue(source.getSpanQueue().size() == 1); } private Span getSpan(){ return new MilliSpan("test span", 0, 1 , 2, "pid"); } -} \ No newline at end of file +}
