PHOENIX-4835 LoggingPhoenixConnection should log metrics upon connection close


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/eb79c5b1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/eb79c5b1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/eb79c5b1

Branch: refs/heads/omid2
Commit: eb79c5b11b95bcd261c150ecd0f709712be2217c
Parents: dbbb112
Author: Karan Mehta <[email protected]>
Authored: Thu Aug 16 15:08:12 2018 -0700
Committer: Karan Mehta <[email protected]>
Committed: Fri Aug 17 12:02:22 2018 -0700

----------------------------------------------------------------------
 .../monitoring/BasePhoenixMetricsIT.java        | 128 +++++++++++++
 .../monitoring/PhoenixLoggingMetricsIT.java     | 181 +++++++++++++++++++
 .../phoenix/monitoring/PhoenixMetricsIT.java    | 170 ++---------------
 .../phoenix/jdbc/LoggingPhoenixConnection.java  |  12 +-
 4 files changed, 332 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/eb79c5b1/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
new file mode 100644
index 0000000..5c016f6
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class BasePhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
+
+    static final int MAX_RETRIES = 5;
+
+    static final List<MetricType> mutationMetricsToSkip =
+    Lists.newArrayList(MetricType.MUTATION_COMMIT_TIME);
+    static final List<MetricType> readMetricsToSkip =
+    Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME,
+            MetricType.TASK_EXECUTION_TIME, MetricType.TASK_END_TO_END_TIME,
+            MetricType.COUNT_MILLS_BETWEEN_NEXTS);
+    static final String CUSTOM_URL_STRING = "SESSION";
+    static final AtomicInteger numConnections = new AtomicInteger(0);
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        // Phoenix Global client metrics are enabled by default
+        // Enable request metric collection at the driver level
+        props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, 
String.valueOf(true));
+        // disable renewing leases as this will force spooling to happen.
+        props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+        // need the non-test driver for some tests that check number of 
hconnections, etc.
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+
+    }
+
+    Connection insertRowsInTable(String tableName, long numRows) throws 
SQLException {
+        String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+        Connection conn = DriverManager.getConnection(getUrl());
+        PreparedStatement stmt = conn.prepareStatement(dml);
+        for (int i = 1; i <= numRows; i++) {
+            stmt.setString(1, "key" + i);
+            stmt.setString(2, "value" + i);
+            stmt.executeUpdate();
+        }
+        conn.commit();
+        return conn;
+    }
+
+    void assertReadMetricsForMutatingSql(String tableName, long 
tableSaltBuckets,
+                                                 Map<String, Map<MetricType, 
Long>> readMetrics) {
+        assertTrue("No read metrics present when there should have been!", 
readMetrics.size() > 0);
+        int numTables = 0;
+        for (Map.Entry<String, Map<MetricType, Long>> entry : 
readMetrics.entrySet()) {
+            String t = entry.getKey();
+            assertEquals("Table name didn't match for read metrics", 
tableName, t);
+            numTables++;
+            Map<MetricType, Long> p = entry.getValue();
+            assertTrue("No read metrics present when there should have been", 
p.size() > 0);
+            for (Map.Entry<MetricType, Long> metric : p.entrySet()) {
+                MetricType metricType = metric.getKey();
+                long metricValue = metric.getValue();
+                if (metricType.equals(TASK_EXECUTED_COUNTER)) {
+                    assertEquals(tableSaltBuckets, metricValue);
+                } else if (metricType.equals(SCAN_BYTES)) {
+                    assertTrue("Scan bytes read should be greater than zero", 
metricValue > 0);
+                }
+            }
+        }
+        assertEquals("There should have been read metrics only for one table: 
" + tableName, 1, numTables);
+    }
+
+    void assertMutationMetrics(String tableName, int numRows, Map<String, 
Map<MetricType, Long>> mutationMetrics) {
+        assertTrue("No mutation metrics present when there should have been", 
mutationMetrics.size() > 0);
+        for (Map.Entry<String, Map<MetricType, Long>> entry : 
mutationMetrics.entrySet()) {
+            String t = entry.getKey();
+            assertEquals("Table name didn't match for mutation metrics", 
tableName, t);
+            Map<MetricType, Long> p = entry.getValue();
+            assertEquals("There should have been four metrics", 4, p.size());
+            for (Map.Entry<MetricType, Long> metric : p.entrySet()) {
+                MetricType metricType = metric.getKey();
+                long metricValue = metric.getValue();
+                if (metricType.equals(MetricType.MUTATION_BATCH_SIZE)) {
+                    assertEquals("Mutation batch sizes didn't match!", 
numRows, metricValue);
+                } else if (metricType.equals(MetricType.MUTATION_COMMIT_TIME)) 
{
+                    assertTrue("Mutation commit time should be greater than 
zero", metricValue > 0);
+                } else if (metricType.equals(MetricType.MUTATION_BYTES)) {
+                    assertTrue("Mutation bytes size should be greater than 
zero", metricValue > 0);
+                } else if 
(metricType.equals(MetricType.MUTATION_BATCH_FAILED_SIZE)) {
+                    assertEquals("Zero failed mutations expected", 0, 
metricValue);
+                }
+            }
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/eb79c5b1/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixLoggingMetricsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixLoggingMetricsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixLoggingMetricsIT.java
new file mode 100644
index 0000000..02640e7
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixLoggingMetricsIT.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import com.google.common.collect.Maps;
+import org.apache.phoenix.jdbc.LoggingPhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixMetricsLog;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+
+public class PhoenixLoggingMetricsIT extends BasePhoenixMetricsIT {
+
+    private static final int NUM_ROWS = 10;
+
+    private final Map<MetricType, Long> overAllQueryMetricsMap = 
Maps.newHashMap();
+    private final Map<String, Map<MetricType, Long>> requestReadMetricsMap = 
Maps.newHashMap();
+    private final Map<String, Map<MetricType, Long>> mutationWriteMetricsMap = 
Maps.newHashMap();
+    private final Map<String, Map<MetricType, Long>> mutationReadMetricsMap = 
Maps.newHashMap();
+
+    private String tableName1;
+    private String tableName2;
+    private LoggingPhoenixConnection loggedConn;
+
+    @Before
+    public void beforeTest() throws Exception {
+        clearAllTestMetricMaps();
+        tableName1 = generateUniqueName();
+        String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL 
PRIMARY KEY, V VARCHAR)";
+        Connection setupConn = DriverManager.getConnection(getUrl());
+        setupConn.createStatement().execute(ddl);
+        setupConn.close();
+        insertRowsInTable(tableName1, NUM_ROWS);
+
+        tableName2 = generateUniqueName();
+        ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY 
KEY, V VARCHAR)";
+        setupConn = DriverManager.getConnection(getUrl());
+        setupConn.createStatement().execute(ddl);
+        setupConn.close();
+
+        Connection testConn = DriverManager.getConnection(getUrl());
+        loggedConn = getLoggingPhoenixConnection(testConn);
+    }
+
+    @Test
+    public void testPhoenixMetricsLoggedOnCommit() throws Exception {
+        // run SELECT to verify read metrics are logged
+        String query = "SELECT * FROM " + tableName1;
+        Statement stmt = loggedConn.createStatement();
+        ResultSet rs = stmt.executeQuery(query);
+        while (rs.next()) {
+        }
+        rs.close();
+        assertTrue("Read metrics for not found for " + tableName1,
+                requestReadMetricsMap.get(tableName1).size() > 0);
+        assertTrue("Overall read metrics for not found ", 
overAllQueryMetricsMap.size() > 0);
+
+        // run UPSERT SELECT to verify mutation metrics are logged
+        String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " 
+ tableName1;
+        loggedConn.createStatement().executeUpdate(upsertSelect);
+
+        // Assert that metrics are logged upon commit
+        loggedConn.commit();
+        assertTrue("Mutation write metrics for not found for " + tableName2,
+                mutationWriteMetricsMap.get(tableName2).size() > 0);
+        assertMutationMetrics(tableName2, NUM_ROWS, mutationWriteMetricsMap);
+        assertTrue("Mutation read metrics for not found for " + tableName1,
+                mutationReadMetricsMap.get(tableName1).size() > 0);
+        assertReadMetricsForMutatingSql(tableName1, 1, mutationReadMetricsMap);
+
+        clearAllTestMetricMaps();
+
+        // Assert that metrics logging happens only once
+        loggedConn.close();
+        assertTrue("Mutation write metrics are not logged again.",
+                mutationWriteMetricsMap.size() == 0);
+        assertTrue("Mutation read metrics are not logged again.",
+                mutationReadMetricsMap.size() == 0);
+
+        clearAllTestMetricMaps();
+
+        // Assert that metrics logging happens only once again
+        loggedConn.close();
+        assertTrue("Mutation write metrics are not logged again.",
+                mutationWriteMetricsMap.size() == 0);
+        assertTrue("Mutation read metrics are not logged again.",
+                mutationReadMetricsMap.size() == 0);
+    }
+
+    @Test
+    public void testPhoenixMetricsLoggedOnClose() throws Exception {
+        // run SELECT to verify read metrics are logged
+        String query = "SELECT * FROM " + tableName1;
+        Statement stmt = loggedConn.createStatement();
+        ResultSet rs = stmt.executeQuery(query);
+        while (rs.next()) {
+        }
+        rs.close();
+        assertTrue("Read metrics for not found for " + tableName1,
+                requestReadMetricsMap.get(tableName1).size() > 0);
+        assertTrue("Overall read metrics for not found ", 
overAllQueryMetricsMap.size() > 0);
+
+        // run UPSERT SELECT to verify mutation metrics are logged
+        String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " 
+ tableName1;
+        loggedConn.createStatement().executeUpdate(upsertSelect);
+
+        // Autocommit is turned off by default
+        // Hence mutation metrics are not expected during connection close
+        loggedConn.close();
+        assertTrue("Mutation write metrics are not logged for " + tableName2,
+                mutationWriteMetricsMap.size() == 0);
+        assertTrue("Mutation read metrics for not found for " + tableName1,
+                mutationReadMetricsMap.get(tableName1).size() > 0);
+        assertReadMetricsForMutatingSql(tableName1, 1, mutationReadMetricsMap);
+
+        clearAllTestMetricMaps();
+
+        loggedConn.close();
+        assertTrue("Mutation write metrics are not logged again.",
+                mutationWriteMetricsMap.size() == 0);
+        assertTrue("Mutation read metrics are not logged again.",
+                mutationReadMetricsMap.size() == 0);
+    }
+
+    void clearAllTestMetricMaps() {
+        overAllQueryMetricsMap.clear();
+        requestReadMetricsMap.clear();
+        mutationWriteMetricsMap.clear();
+        mutationReadMetricsMap.clear();
+    }
+
+    LoggingPhoenixConnection getLoggingPhoenixConnection(Connection conn) {
+        return new LoggingPhoenixConnection(conn, new PhoenixMetricsLog() {
+            @Override
+            public void logOverAllReadRequestMetrics(
+                    Map<MetricType, Long> overAllQueryMetrics) {
+                overAllQueryMetricsMap.putAll(overAllQueryMetrics);
+            }
+
+            @Override
+            public void logRequestReadMetrics(
+                    Map<String, Map<MetricType, Long>> requestReadMetrics) {
+                requestReadMetricsMap.putAll(requestReadMetrics);
+            }
+
+            @Override
+            public void logWriteMetricsfoForMutations(
+                    Map<String, Map<MetricType, Long>> mutationWriteMetrics) {
+                mutationWriteMetricsMap.putAll(mutationWriteMetrics);
+            }
+
+            @Override
+            public void logReadMetricInfoForMutationsSinceLastReset(
+                    Map<String, Map<MetricType, Long>> mutationReadMetrics) {
+                mutationReadMetricsMap.putAll(mutationReadMetrics);
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/eb79c5b1/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index 4c5c592..0882cec 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -34,7 +34,6 @@ import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FIL
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME;
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME;
 import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
-import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
@@ -59,51 +58,34 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.jdbc.LoggingPhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDriver;
-import org.apache.phoenix.jdbc.PhoenixMetricsLog;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.log.LogLevel;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
-
-    private static final List<MetricType> mutationMetricsToSkip =
-            Lists.newArrayList(MetricType.MUTATION_COMMIT_TIME);
-    private static final List<MetricType> readMetricsToSkip =
-            Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME,
-                MetricType.TASK_EXECUTION_TIME, 
MetricType.TASK_END_TO_END_TIME,
-                MetricType.COUNT_MILLS_BETWEEN_NEXTS);
-    private static final String CUSTOM_URL_STRING = "SESSION";
-    private static final AtomicInteger numConnections = new AtomicInteger(0);
-
-    @BeforeClass
-    public static void doSetup() throws Exception {
-        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
-        // Enable request metric collection at the driver level
-        props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, 
String.valueOf(true));
-        // disable renewing leases as this will force spooling to happen.
-        props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false));
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-        // need the non-test driver for some tests that check number of 
hconnections, etc.
-        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
-    }
+/**
+ * Tests that
+ * 1. Phoenix Global metrics are exposed via
+ *   a. PhoenixRuntime b. Hadoop-Metrics2 defined sinks
+ * 2. Phoenix Request level metrics are exposed via
+ *   a. PhoenixRuntime
+ */
+public class PhoenixMetricsIT extends BasePhoenixMetricsIT {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixMetricsIT.class);
 
     @Test
     public void testResetGlobalPhoenixMetrics() {
@@ -740,19 +722,6 @@ public class PhoenixMetricsIT extends 
BaseUniqueNamesOwnClusterIT {
                 expectedTableNames.size() == 0);
     }
 
-    private Connection insertRowsInTable(String tableName, long numRows) 
throws SQLException {
-        String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
-        Connection conn = DriverManager.getConnection(getUrl());
-        PreparedStatement stmt = conn.prepareStatement(dml);
-        for (int i = 1; i <= numRows; i++) {
-            stmt.setString(1, "key" + i);
-            stmt.setString(2, "value" + i);
-            stmt.executeUpdate();
-        }
-        conn.commit();
-        return conn;
-    }
-
     // number of records read should be number of bytes at the end
     public static final long SCAN_BYTES_DELTA = 1;
 
@@ -800,52 +769,6 @@ public class PhoenixMetricsIT extends 
BaseUniqueNamesOwnClusterIT {
         }
     }
 
-    private void assertReadMetricsForMutatingSql(String tableName, long 
tableSaltBuckets,
-            Map<String, Map<MetricType, Long>> readMetrics) {
-        assertTrue("No read metrics present when there should have been!", 
readMetrics.size() > 0);
-        int numTables = 0;
-        for (Entry<String, Map<MetricType, Long>> entry : 
readMetrics.entrySet()) {
-            String t = entry.getKey();
-            assertEquals("Table name didn't match for read metrics", 
tableName, t);
-            numTables++;
-            Map<MetricType, Long> p = entry.getValue();
-            assertTrue("No read metrics present when there should have been", 
p.size() > 0);
-            for (Entry<MetricType, Long> metric : p.entrySet()) {
-               MetricType metricType = metric.getKey();
-                long metricValue = metric.getValue();
-                if (metricType.equals(TASK_EXECUTED_COUNTER)) {
-                    assertEquals(tableSaltBuckets, metricValue);
-                } else if (metricType.equals(SCAN_BYTES)) {
-                    assertTrue("Scan bytes read should be greater than zero", 
metricValue > 0);
-                }
-            }
-        }
-        assertEquals("There should have been read metrics only for one table: 
" + tableName, 1, numTables);
-    }
-
-    private void assertMutationMetrics(String tableName, int numRows, 
Map<String, Map<MetricType, Long>> mutationMetrics) {
-        assertTrue("No mutation metrics present when there should have been", 
mutationMetrics.size() > 0);
-        for (Entry<String, Map<MetricType, Long>> entry : 
mutationMetrics.entrySet()) {
-            String t = entry.getKey();
-            assertEquals("Table name didn't match for mutation metrics", 
tableName, t);
-            Map<MetricType, Long> p = entry.getValue();
-            assertEquals("There should have been four metrics", 4, p.size());
-            for (Entry<MetricType, Long> metric : p.entrySet()) {
-               MetricType metricType = metric.getKey();
-                long metricValue = metric.getValue();
-                if (metricType.equals(MetricType.MUTATION_BATCH_SIZE)) {
-                    assertEquals("Mutation batch sizes didn't match!", 
numRows, metricValue);
-                } else if (metricType.equals(MetricType.MUTATION_COMMIT_TIME)) 
{
-                    assertTrue("Mutation commit time should be greater than 
zero", metricValue > 0);
-                } else if (metricType.equals(MetricType.MUTATION_BYTES)) {
-                    assertTrue("Mutation bytes size should be greater than 
zero", metricValue > 0);
-                } else if 
(metricType.equals(MetricType.MUTATION_BATCH_FAILED_SIZE)) {
-                    assertEquals("Zero failed mutations expected", 0, 
metricValue);
-                }
-            }
-        }
-    }
-
     @Test
     public void testGetConnectionsForSameUrlConcurrently()  throws Exception {
         // establish url and quorum. Need to use PhoenixDriver and not 
PhoenixTestDriver
@@ -1020,74 +943,5 @@ public class PhoenixMetricsIT extends 
BaseUniqueNamesOwnClusterIT {
             return c;
         }
     }
-    
-    @Test
-    public void testPhoenixMetricsLogged() throws Exception {
-        final Map<MetricType, Long> overAllQueryMetricsMap = Maps.newHashMap();
-        final Map<String, Map<MetricType, Long>> requestReadMetricsMap = 
Maps.newHashMap();
-        final Map<String, Map<MetricType, Long>> mutationWriteMetricsMap = 
Maps.newHashMap();
-        final Map<String, Map<MetricType, Long>> mutationReadMetricsMap = 
Maps.newHashMap();
-
-        String tableName1 = generateUniqueName();
-        String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL 
PRIMARY KEY, V VARCHAR)";
-        Connection ddlConn = DriverManager.getConnection(getUrl());
-        ddlConn.createStatement().execute(ddl);
-        ddlConn.close();
-        insertRowsInTable(tableName1, 10);
-
-        String tableName2 = generateUniqueName();
-        ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY 
KEY, V VARCHAR)";
-        ddlConn = DriverManager.getConnection(getUrl());
-        ddlConn.createStatement().execute(ddl);
-        ddlConn.close();
 
-        Connection conn = DriverManager.getConnection(getUrl());
-        LoggingPhoenixConnection protectedConn =
-                new LoggingPhoenixConnection(conn, new PhoenixMetricsLog() {
-                    @Override
-                    public void logOverAllReadRequestMetrics(
-                            Map<MetricType, Long> overAllQueryMetrics) {
-                        overAllQueryMetricsMap.putAll(overAllQueryMetrics);
-                    }
-
-                    @Override
-                    public void logRequestReadMetrics(
-                            Map<String, Map<MetricType, Long>> 
requestReadMetrics) {
-                        requestReadMetricsMap.putAll(requestReadMetrics);
-                    }
-
-                    @Override
-                    public void logWriteMetricsfoForMutations(
-                            Map<String, Map<MetricType, Long>> 
mutationWriteMetrics) {
-                        mutationWriteMetricsMap.putAll(mutationWriteMetrics);
-                    }
-
-                    @Override
-                    public void logReadMetricInfoForMutationsSinceLastReset(
-                            Map<String, Map<MetricType, Long>> 
mutationReadMetrics) {
-                        mutationReadMetricsMap.putAll(mutationReadMetrics);
-                    }
-                });
-        
-        // run SELECT to verify read metrics are logged
-        String query = "SELECT * FROM " + tableName1;
-        Statement stmt = protectedConn.createStatement();
-        ResultSet rs = stmt.executeQuery(query);
-        while (rs.next()) {
-        }
-        rs.close();
-        assertTrue("Read metrics for not found for " + tableName1,
-            requestReadMetricsMap.get(tableName1).size() > 0);
-        assertTrue("Overall read metrics for not found ", 
overAllQueryMetricsMap.size() > 0);
-
-        // run UPSERT SELECT to verify mutation metrics are logged
-        String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " 
+ tableName1;
-        protectedConn.createStatement().executeUpdate(upsertSelect);
-        protectedConn.commit();
-        assertTrue("Mutation write metrics for not found for " + tableName2,
-            mutationWriteMetricsMap.get(tableName2).size() > 0);
-        assertTrue("Mutation read metrics for not found for " + tableName1,
-            mutationReadMetricsMap.get(tableName1).size() > 0);
-        protectedConn.close();
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/eb79c5b1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/LoggingPhoenixConnection.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/LoggingPhoenixConnection.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/LoggingPhoenixConnection.java
index d98da83..9a2e00f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/LoggingPhoenixConnection.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/LoggingPhoenixConnection.java
@@ -98,7 +98,7 @@ public class LoggingPhoenixConnection extends 
DelegateConnection {
         return new LoggingPhoenixPreparedStatement(super.prepareStatement(sql, 
columnNames),
                 phoenixMetricsLog);
     }
-    
+
     @Override
     public void commit() throws SQLException {
         super.commit();
@@ -107,4 +107,14 @@ public class LoggingPhoenixConnection extends 
DelegateConnection {
         PhoenixRuntime.resetMetrics(conn);
     }
 
+    @Override
+    public void close() throws SQLException {
+        try {
+            
phoenixMetricsLog.logWriteMetricsfoForMutations(PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn));
+            
phoenixMetricsLog.logReadMetricInfoForMutationsSinceLastReset(PhoenixRuntime.getReadMetricInfoForMutationsSinceLastReset(conn));
+            PhoenixRuntime.resetMetrics(conn);
+        } finally {
+            super.close();
+        }
+    }
 }

Reply via email to