Github user JamesRTaylor commented on a diff in the pull request:
https://github.com/apache/phoenix/pull/5#discussion_r15383971
--- Diff:
phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixTableMetricsWriter.java
---
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.trace;
+
+import static org.apache.phoenix.metrics.MetricInfo.ANNOTATION;
+import static org.apache.phoenix.metrics.MetricInfo.TAG;
+import static org.apache.phoenix.metrics.MetricInfo.DESCRIPTION;
+import static org.apache.phoenix.metrics.MetricInfo.END;
+import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
+import static org.apache.phoenix.metrics.MetricInfo.PARENT;
+import static org.apache.phoenix.metrics.MetricInfo.SPAN;
+import static org.apache.phoenix.metrics.MetricInfo.START;
+import static org.apache.phoenix.metrics.MetricInfo.TRACE;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.phoenix.metrics.MetricInfo;
+import org.apache.phoenix.metrics.MetricsWriter;
+import org.apache.phoenix.metrics.PhoenixAbstractMetric;
+import org.apache.phoenix.metrics.PhoenixMetricTag;
+import org.apache.phoenix.metrics.PhoenixMetricsRecord;
+import org.apache.phoenix.util.QueryUtil;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+
+/**
+ * Sink that writes phoenix metrics to a phoenix table
+ * <p>
+ * Each metric record should only correspond to a single completed span.
Each span is only updated
+ * in the phoenix table <i>once</i>
+ */
+public class PhoenixTableMetricsWriter implements MetricsWriter {
+
+ private static final String VARIABLE_VALUE = "?";
+
+ public static final Log LOG =
LogFactory.getLog(PhoenixTableMetricsWriter.class);
+
+ private static final Joiner COLUMN_JOIN = Joiner.on(".");
+ static final String TAG_FAMILY = "tags";
+ /** Count of the number of tags we are storing for this row */
+ static final String TAG_COUNT = COLUMN_JOIN.join(TAG_FAMILY, "count");
+
+ static final String ANNOTATION_FAMILY = "annotations";
+ static final String ANNOTATION_COUNT =
COLUMN_JOIN.join(ANNOTATION_FAMILY, "count");
+
+ /** Join strings on a comma */
+ private static final Joiner COMMAS = Joiner.on(',');
+
+ private Connection conn;
+
+ private String table;
+
+ @Override
+ public void initialize() {
+ try {
+ // create the phoenix connection
+ Configuration conf = HBaseConfiguration.create();
+ Connection conn = QueryUtil.getConnection(conf);
+ // enable bulk loading when we have enough data
+ conn.setAutoCommit(true);
+
+ String tableName =
+ conf.get(TracingCompat.TARGET_TABLE_CONF_KEY,
+ TracingCompat.DEFAULT_STATS_TABLE_NAME);
+
+ initializeInternal(conn, tableName);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void initializeInternal(Connection conn, String tableName)
throws SQLException {
+ this.conn = conn;
+
+ // ensure that the target table already exists
+ createTable(conn, tableName);
+ }
+
+ /**
+ * Used for <b>TESTING ONLY</b>
+ * <p>
+ * Initialize the connection and setup the table to use the
+ * {@link TracingCompat#DEFAULT_STATS_TABLE_NAME}
+ * @param conn to store for upserts and to create the table (if
necessary)
+ * @throws SQLException if any phoenix operation fails
+ */
+ @VisibleForTesting
+ public void initForTesting(Connection conn) throws SQLException {
+ initializeInternal(conn, TracingCompat.DEFAULT_STATS_TABLE_NAME);
+ }
+
+ /**
+ * Create a stats table with the given name. Stores the name for use
later when creating upsert
+ * statements
+ * @param conn connection to use when creating the table
+ * @param table name of the table to create
+ * @throws SQLException if any phoenix operations fails
+ */
+ private void createTable(Connection conn, String table) throws
SQLException {
+ // only primary-key columns can be marked non-null
+ String ddl =
+ "create table if not exists " + table + "( " +
+ TRACE.columnName + " bigint not null, " +
+ PARENT.columnName + " bigint not null, " +
+ SPAN.columnName + " bigint not null, " +
+ DESCRIPTION.columnName + " varchar, " +
+ START.columnName + " bigint, " +
+ END.columnName + " bigint, " +
+ HOSTNAME.columnName + " varchar, " +
+ TAG_COUNT + " smallint, " +
+ ANNOTATION_COUNT + " smallint" +
+ " CONSTRAINT pk PRIMARY KEY (" + TRACE.columnName
+ ", "
+ + PARENT.columnName + ", " + SPAN.columnName +
"))\n";
+ PreparedStatement stmt = conn.prepareStatement(ddl);
+ stmt.execute();
+ this.table = table;
+ }
+
+ @Override
+ public void flush() {
+ try {
+ this.conn.commit();
+ this.conn.rollback();
+ } catch (SQLException e) {
+ LOG.error("Failed to commit changes to table", e);
+ }
+ }
+
+ /**
+ * Add a new metric record to be written.
+ * @param record
+ */
+ @Override
+ public void addMetrics(PhoenixMetricsRecord record) {
+ // its not a tracing record, we are done. This could also be
handled by filters, but safer
+ // to do it here, in case it gets misconfigured
+ if (!record.name().startsWith(TracingCompat.METRIC_SOURCE_KEY)) {
+ return;
+ }
+ String stmt = "UPSERT INTO " + table + " (";
--- End diff --
Ideally, if possible, you'd want to build this UPSERT statement once, call
conn.prepareStatement(upsert) on it, and then here just bind the bind variables
and execute it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---