PHOENIX-4835 LoggingPhoenixConnection should log metrics upon connection close
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/eb79c5b1 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/eb79c5b1 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/eb79c5b1 Branch: refs/heads/omid2 Commit: eb79c5b11b95bcd261c150ecd0f709712be2217c Parents: dbbb112 Author: Karan Mehta <[email protected]> Authored: Thu Aug 16 15:08:12 2018 -0700 Committer: Karan Mehta <[email protected]> Committed: Fri Aug 17 12:02:22 2018 -0700 ---------------------------------------------------------------------- .../monitoring/BasePhoenixMetricsIT.java | 128 +++++++++++++ .../monitoring/PhoenixLoggingMetricsIT.java | 181 +++++++++++++++++++ .../phoenix/monitoring/PhoenixMetricsIT.java | 170 ++--------------- .../phoenix/jdbc/LoggingPhoenixConnection.java | 12 +- 4 files changed, 332 insertions(+), 159 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/eb79c5b1/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java new file mode 100644 index 0000000..5c016f6 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES; +import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class BasePhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { + + static final int MAX_RETRIES = 5; + + static final List<MetricType> mutationMetricsToSkip = + Lists.newArrayList(MetricType.MUTATION_COMMIT_TIME); + static final List<MetricType> readMetricsToSkip = + Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME, + MetricType.TASK_EXECUTION_TIME, MetricType.TASK_END_TO_END_TIME, + MetricType.COUNT_MILLS_BETWEEN_NEXTS); + static final String CUSTOM_URL_STRING = "SESSION"; + static final AtomicInteger numConnections = new AtomicInteger(0); + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + // Phoenix Global client metrics are enabled by default + // Enable request metric collection at the driver level + props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true)); + // disable renewing leases as this will force spooling to happen. + props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + // need the non-test driver for some tests that check number of hconnections, etc. + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + + } + + Connection insertRowsInTable(String tableName, long numRows) throws SQLException { + String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)"; + Connection conn = DriverManager.getConnection(getUrl()); + PreparedStatement stmt = conn.prepareStatement(dml); + for (int i = 1; i <= numRows; i++) { + stmt.setString(1, "key" + i); + stmt.setString(2, "value" + i); + stmt.executeUpdate(); + } + conn.commit(); + return conn; + } + + void assertReadMetricsForMutatingSql(String tableName, long tableSaltBuckets, + Map<String, Map<MetricType, Long>> readMetrics) { + assertTrue("No read metrics present when there should have been!", readMetrics.size() > 0); + int numTables = 0; + for (Map.Entry<String, Map<MetricType, Long>> entry : readMetrics.entrySet()) { + String t = entry.getKey(); + assertEquals("Table name didn't match for read metrics", tableName, t); + numTables++; + Map<MetricType, Long> p = entry.getValue(); + assertTrue("No read metrics present when there should have been", p.size() > 0); + for (Map.Entry<MetricType, Long> metric : p.entrySet()) { + MetricType metricType = metric.getKey(); + long metricValue = metric.getValue(); + if (metricType.equals(TASK_EXECUTED_COUNTER)) { + assertEquals(tableSaltBuckets, metricValue); + } else if (metricType.equals(SCAN_BYTES)) { + assertTrue("Scan bytes read should be greater than zero", metricValue > 0); + } + } + } + assertEquals("There should have been read metrics only for one table: " + tableName, 1, numTables); + } + + void assertMutationMetrics(String tableName, int numRows, Map<String, Map<MetricType, Long>> mutationMetrics) { + assertTrue("No mutation metrics present when there should have been", mutationMetrics.size() > 0); + for (Map.Entry<String, Map<MetricType, Long>> entry : mutationMetrics.entrySet()) { + String t = entry.getKey(); + assertEquals("Table name didn't match for mutation metrics", tableName, t); + Map<MetricType, Long> p = entry.getValue(); + assertEquals("There should have been four metrics", 4, p.size()); + for (Map.Entry<MetricType, Long> metric : p.entrySet()) { + MetricType metricType = metric.getKey(); + long metricValue = metric.getValue(); + if (metricType.equals(MetricType.MUTATION_BATCH_SIZE)) { + assertEquals("Mutation batch sizes didn't match!", numRows, metricValue); + } else if (metricType.equals(MetricType.MUTATION_COMMIT_TIME)) { + assertTrue("Mutation commit time should be greater than zero", metricValue > 0); + } else if (metricType.equals(MetricType.MUTATION_BYTES)) { + assertTrue("Mutation bytes size should be greater than zero", metricValue > 0); + } else if (metricType.equals(MetricType.MUTATION_BATCH_FAILED_SIZE)) { + assertEquals("Zero failed mutations expected", 0, metricValue); + } + } + } + } + + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/eb79c5b1/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixLoggingMetricsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixLoggingMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixLoggingMetricsIT.java new file mode 100644 index 0000000..02640e7 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixLoggingMetricsIT.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import com.google.common.collect.Maps; +import org.apache.phoenix.jdbc.LoggingPhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixMetricsLog; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Map; + +import static org.junit.Assert.assertTrue; + +public class PhoenixLoggingMetricsIT extends BasePhoenixMetricsIT { + + private static final int NUM_ROWS = 10; + + private final Map<MetricType, Long> overAllQueryMetricsMap = Maps.newHashMap(); + private final Map<String, Map<MetricType, Long>> requestReadMetricsMap = Maps.newHashMap(); + private final Map<String, Map<MetricType, Long>> mutationWriteMetricsMap = Maps.newHashMap(); + private final Map<String, Map<MetricType, Long>> mutationReadMetricsMap = Maps.newHashMap(); + + private String tableName1; + private String tableName2; + private LoggingPhoenixConnection loggedConn; + + @Before + public void beforeTest() throws Exception { + clearAllTestMetricMaps(); + tableName1 = generateUniqueName(); + String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)"; + Connection setupConn = DriverManager.getConnection(getUrl()); + setupConn.createStatement().execute(ddl); + setupConn.close(); + insertRowsInTable(tableName1, NUM_ROWS); + + tableName2 = generateUniqueName(); + ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)"; + setupConn = DriverManager.getConnection(getUrl()); + setupConn.createStatement().execute(ddl); + setupConn.close(); + + Connection testConn = DriverManager.getConnection(getUrl()); + loggedConn = getLoggingPhoenixConnection(testConn); + } + + @Test + public void testPhoenixMetricsLoggedOnCommit() throws Exception { + // run SELECT to verify read metrics are logged + String query = "SELECT * FROM " + tableName1; + Statement stmt = loggedConn.createStatement(); + ResultSet rs = stmt.executeQuery(query); + while (rs.next()) { + } + rs.close(); + assertTrue("Read metrics for not found for " + tableName1, + requestReadMetricsMap.get(tableName1).size() > 0); + assertTrue("Overall read metrics for not found ", overAllQueryMetricsMap.size() > 0); + + // run UPSERT SELECT to verify mutation metrics are logged + String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1; + loggedConn.createStatement().executeUpdate(upsertSelect); + + // Assert that metrics are logged upon commit + loggedConn.commit(); + assertTrue("Mutation write metrics for not found for " + tableName2, + mutationWriteMetricsMap.get(tableName2).size() > 0); + assertMutationMetrics(tableName2, NUM_ROWS, mutationWriteMetricsMap); + assertTrue("Mutation read metrics for not found for " + tableName1, + mutationReadMetricsMap.get(tableName1).size() > 0); + assertReadMetricsForMutatingSql(tableName1, 1, mutationReadMetricsMap); + + clearAllTestMetricMaps(); + + // Assert that metrics logging happens only once + loggedConn.close(); + assertTrue("Mutation write metrics are not logged again.", + mutationWriteMetricsMap.size() == 0); + assertTrue("Mutation read metrics are not logged again.", + mutationReadMetricsMap.size() == 0); + + clearAllTestMetricMaps(); + + // Assert that metrics logging happens only once again + loggedConn.close(); + assertTrue("Mutation write metrics are not logged again.", + mutationWriteMetricsMap.size() == 0); + assertTrue("Mutation read metrics are not logged again.", + mutationReadMetricsMap.size() == 0); + } + + @Test + public void testPhoenixMetricsLoggedOnClose() throws Exception { + // run SELECT to verify read metrics are logged + String query = "SELECT * FROM " + tableName1; + Statement stmt = loggedConn.createStatement(); + ResultSet rs = stmt.executeQuery(query); + while (rs.next()) { + } + rs.close(); + assertTrue("Read metrics for not found for " + tableName1, + requestReadMetricsMap.get(tableName1).size() > 0); + assertTrue("Overall read metrics for not found ", overAllQueryMetricsMap.size() > 0); + + // run UPSERT SELECT to verify mutation metrics are logged + String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1; + loggedConn.createStatement().executeUpdate(upsertSelect); + + // Autocommit is turned off by default + // Hence mutation metrics are not expected during connection close + loggedConn.close(); + assertTrue("Mutation write metrics are not logged for " + tableName2, + mutationWriteMetricsMap.size() == 0); + assertTrue("Mutation read metrics for not found for " + tableName1, + mutationReadMetricsMap.get(tableName1).size() > 0); + assertReadMetricsForMutatingSql(tableName1, 1, mutationReadMetricsMap); + + clearAllTestMetricMaps(); + + loggedConn.close(); + assertTrue("Mutation write metrics are not logged again.", + mutationWriteMetricsMap.size() == 0); + assertTrue("Mutation read metrics are not logged again.", + mutationReadMetricsMap.size() == 0); + } + + void clearAllTestMetricMaps() { + overAllQueryMetricsMap.clear(); + requestReadMetricsMap.clear(); + mutationWriteMetricsMap.clear(); + mutationReadMetricsMap.clear(); + } + + LoggingPhoenixConnection getLoggingPhoenixConnection(Connection conn) { + return new LoggingPhoenixConnection(conn, new PhoenixMetricsLog() { + @Override + public void logOverAllReadRequestMetrics( + Map<MetricType, Long> overAllQueryMetrics) { + overAllQueryMetricsMap.putAll(overAllQueryMetrics); + } + + @Override + public void logRequestReadMetrics( + Map<String, Map<MetricType, Long>> requestReadMetrics) { + requestReadMetricsMap.putAll(requestReadMetrics); + } + + @Override + public void logWriteMetricsfoForMutations( + Map<String, Map<MetricType, Long>> mutationWriteMetrics) { + mutationWriteMetricsMap.putAll(mutationWriteMetrics); + } + + @Override + public void logReadMetricInfoForMutationsSinceLastReset( + Map<String, Map<MetricType, Long>> mutationReadMetrics) { + mutationReadMetricsMap.putAll(mutationReadMetrics); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/eb79c5b1/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java index 4c5c592..0882cec 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java @@ -34,7 +34,6 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FIL import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME; import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES; -import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES; import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER; import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME; import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB; @@ -59,51 +58,34 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.phoenix.compile.StatementContext; -import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; import org.apache.phoenix.exception.SQLExceptionCode; -import org.apache.phoenix.jdbc.LoggingPhoenixConnection; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDriver; -import org.apache.phoenix.jdbc.PhoenixMetricsLog; import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.log.LogLevel; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.ReadOnlyProps; -import org.junit.BeforeClass; import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; import com.google.common.base.Joiner; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; -public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { - - private static final List<MetricType> mutationMetricsToSkip = - Lists.newArrayList(MetricType.MUTATION_COMMIT_TIME); - private static final List<MetricType> readMetricsToSkip = - Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME, - MetricType.TASK_EXECUTION_TIME, MetricType.TASK_END_TO_END_TIME, - MetricType.COUNT_MILLS_BETWEEN_NEXTS); - private static final String CUSTOM_URL_STRING = "SESSION"; - private static final AtomicInteger numConnections = new AtomicInteger(0); - - @BeforeClass - public static void doSetup() throws Exception { - Map<String, String> props = Maps.newHashMapWithExpectedSize(1); - // Enable request metric collection at the driver level - props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true)); - // disable renewing leases as this will force spooling to happen. - props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false)); - setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); - // need the non-test driver for some tests that check number of hconnections, etc. - DriverManager.registerDriver(PhoenixDriver.INSTANCE); - } +/** + * Tests that + * 1. Phoenix Global metrics are exposed via + * a. PhoenixRuntime b. Hadoop-Metrics2 defined sinks + * 2. Phoenix Request level metrics are exposed via + * a. PhoenixRuntime + */ +public class PhoenixMetricsIT extends BasePhoenixMetricsIT { + + private static final Log LOG = LogFactory.getLog(PhoenixMetricsIT.class); @Test public void testResetGlobalPhoenixMetrics() { @@ -740,19 +722,6 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { expectedTableNames.size() == 0); } - private Connection insertRowsInTable(String tableName, long numRows) throws SQLException { - String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)"; - Connection conn = DriverManager.getConnection(getUrl()); - PreparedStatement stmt = conn.prepareStatement(dml); - for (int i = 1; i <= numRows; i++) { - stmt.setString(1, "key" + i); - stmt.setString(2, "value" + i); - stmt.executeUpdate(); - } - conn.commit(); - return conn; - } - // number of records read should be number of bytes at the end public static final long SCAN_BYTES_DELTA = 1; @@ -800,52 +769,6 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { } } - private void assertReadMetricsForMutatingSql(String tableName, long tableSaltBuckets, - Map<String, Map<MetricType, Long>> readMetrics) { - assertTrue("No read metrics present when there should have been!", readMetrics.size() > 0); - int numTables = 0; - for (Entry<String, Map<MetricType, Long>> entry : readMetrics.entrySet()) { - String t = entry.getKey(); - assertEquals("Table name didn't match for read metrics", tableName, t); - numTables++; - Map<MetricType, Long> p = entry.getValue(); - assertTrue("No read metrics present when there should have been", p.size() > 0); - for (Entry<MetricType, Long> metric : p.entrySet()) { - MetricType metricType = metric.getKey(); - long metricValue = metric.getValue(); - if (metricType.equals(TASK_EXECUTED_COUNTER)) { - assertEquals(tableSaltBuckets, metricValue); - } else if (metricType.equals(SCAN_BYTES)) { - assertTrue("Scan bytes read should be greater than zero", metricValue > 0); - } - } - } - assertEquals("There should have been read metrics only for one table: " + tableName, 1, numTables); - } - - private void assertMutationMetrics(String tableName, int numRows, Map<String, Map<MetricType, Long>> mutationMetrics) { - assertTrue("No mutation metrics present when there should have been", mutationMetrics.size() > 0); - for (Entry<String, Map<MetricType, Long>> entry : mutationMetrics.entrySet()) { - String t = entry.getKey(); - assertEquals("Table name didn't match for mutation metrics", tableName, t); - Map<MetricType, Long> p = entry.getValue(); - assertEquals("There should have been four metrics", 4, p.size()); - for (Entry<MetricType, Long> metric : p.entrySet()) { - MetricType metricType = metric.getKey(); - long metricValue = metric.getValue(); - if (metricType.equals(MetricType.MUTATION_BATCH_SIZE)) { - assertEquals("Mutation batch sizes didn't match!", numRows, metricValue); - } else if (metricType.equals(MetricType.MUTATION_COMMIT_TIME)) { - assertTrue("Mutation commit time should be greater than zero", metricValue > 0); - } else if (metricType.equals(MetricType.MUTATION_BYTES)) { - assertTrue("Mutation bytes size should be greater than zero", metricValue > 0); - } else if (metricType.equals(MetricType.MUTATION_BATCH_FAILED_SIZE)) { - assertEquals("Zero failed mutations expected", 0, metricValue); - } - } - } - } - @Test public void testGetConnectionsForSameUrlConcurrently() throws Exception { // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver @@ -1020,74 +943,5 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { return c; } } - - @Test - public void testPhoenixMetricsLogged() throws Exception { - final Map<MetricType, Long> overAllQueryMetricsMap = Maps.newHashMap(); - final Map<String, Map<MetricType, Long>> requestReadMetricsMap = Maps.newHashMap(); - final Map<String, Map<MetricType, Long>> mutationWriteMetricsMap = Maps.newHashMap(); - final Map<String, Map<MetricType, Long>> mutationReadMetricsMap = Maps.newHashMap(); - - String tableName1 = generateUniqueName(); - String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)"; - Connection ddlConn = DriverManager.getConnection(getUrl()); - ddlConn.createStatement().execute(ddl); - ddlConn.close(); - insertRowsInTable(tableName1, 10); - - String tableName2 = generateUniqueName(); - ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)"; - ddlConn = DriverManager.getConnection(getUrl()); - ddlConn.createStatement().execute(ddl); - ddlConn.close(); - Connection conn = DriverManager.getConnection(getUrl()); - LoggingPhoenixConnection protectedConn = - new LoggingPhoenixConnection(conn, new PhoenixMetricsLog() { - @Override - public void logOverAllReadRequestMetrics( - Map<MetricType, Long> overAllQueryMetrics) { - overAllQueryMetricsMap.putAll(overAllQueryMetrics); - } - - @Override - public void logRequestReadMetrics( - Map<String, Map<MetricType, Long>> requestReadMetrics) { - requestReadMetricsMap.putAll(requestReadMetrics); - } - - @Override - public void logWriteMetricsfoForMutations( - Map<String, Map<MetricType, Long>> mutationWriteMetrics) { - mutationWriteMetricsMap.putAll(mutationWriteMetrics); - } - - @Override - public void logReadMetricInfoForMutationsSinceLastReset( - Map<String, Map<MetricType, Long>> mutationReadMetrics) { - mutationReadMetricsMap.putAll(mutationReadMetrics); - } - }); - - // run SELECT to verify read metrics are logged - String query = "SELECT * FROM " + tableName1; - Statement stmt = protectedConn.createStatement(); - ResultSet rs = stmt.executeQuery(query); - while (rs.next()) { - } - rs.close(); - assertTrue("Read metrics for not found for " + tableName1, - requestReadMetricsMap.get(tableName1).size() > 0); - assertTrue("Overall read metrics for not found ", overAllQueryMetricsMap.size() > 0); - - // run UPSERT SELECT to verify mutation metrics are logged - String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1; - protectedConn.createStatement().executeUpdate(upsertSelect); - protectedConn.commit(); - assertTrue("Mutation write metrics for not found for " + tableName2, - mutationWriteMetricsMap.get(tableName2).size() > 0); - assertTrue("Mutation read metrics for not found for " + tableName1, - mutationReadMetricsMap.get(tableName1).size() > 0); - protectedConn.close(); - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/eb79c5b1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/LoggingPhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/LoggingPhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/LoggingPhoenixConnection.java index d98da83..9a2e00f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/LoggingPhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/LoggingPhoenixConnection.java @@ -98,7 +98,7 @@ public class LoggingPhoenixConnection extends DelegateConnection { return new LoggingPhoenixPreparedStatement(super.prepareStatement(sql, columnNames), phoenixMetricsLog); } - + @Override public void commit() throws SQLException { super.commit(); @@ -107,4 +107,14 @@ public class LoggingPhoenixConnection extends DelegateConnection { PhoenixRuntime.resetMetrics(conn); } + @Override + public void close() throws SQLException { + try { + phoenixMetricsLog.logWriteMetricsfoForMutations(PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn)); + phoenixMetricsLog.logReadMetricInfoForMutationsSinceLastReset(PhoenixRuntime.getReadMetricInfoForMutationsSinceLastReset(conn)); + PhoenixRuntime.resetMetrics(conn); + } finally { + super.close(); + } + } }
