PHOENIX-2715 Query Log (Ankit Singhal)

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/28d74053
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/28d74053
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/28d74053

Branch: refs/heads/4.x-HBase-0.98
Commit: 28d74053f8984e2c79f787b9043ba8ed5252d903
Parents: fce9af5
Author: Ankit Singhal <ankitsingha...@gmail.com>
Authored: Sat Apr 14 11:33:26 2018 +0530
Committer: Ankit Singhal <ankitsingha...@gmail.com>
Committed: Sat Apr 14 11:33:26 2018 +0530

----------------------------------------------------------------------
 phoenix-core/pom.xml                            |  10 +-
 .../MigrateSystemTablesToSystemNamespaceIT.java |  40 ++-
 .../end2end/QueryDatabaseMetaDataIT.java        |   4 +
 .../apache/phoenix/end2end/QueryLoggerIT.java   | 356 +++++++++++++++++++
 .../end2end/TenantSpecificTablesDDLIT.java      |   2 +
 .../phoenix/compile/StatementContext.java       |  10 +
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  17 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |  18 +
 .../phoenix/jdbc/PhoenixPreparedStatement.java  |  11 +-
 .../apache/phoenix/jdbc/PhoenixResultSet.java   |  35 ++
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  67 +++-
 .../java/org/apache/phoenix/log/LogLevel.java   |  22 ++
 .../java/org/apache/phoenix/log/LogWriter.java  |  51 +++
 .../log/QueryLogDetailsEventHandler.java        |  63 ++++
 .../org/apache/phoenix/log/QueryLogInfo.java    |  87 +++++
 .../org/apache/phoenix/log/QueryLogState.java   |  22 ++
 .../org/apache/phoenix/log/QueryLogger.java     | 145 ++++++++
 .../log/QueryLoggerDefaultExceptionHandler.java |  51 +++
 .../phoenix/log/QueryLoggerDisruptor.java       | 117 ++++++
 .../org/apache/phoenix/log/QueryLoggerUtil.java |  62 ++++
 .../org/apache/phoenix/log/RingBufferEvent.java |  93 +++++
 .../phoenix/log/RingBufferEventTranslator.java  |  53 +++
 .../org/apache/phoenix/log/TableLogWriter.java  | 128 +++++++
 .../phoenix/monitoring/ReadMetricQueue.java     |  24 +-
 .../phoenix/query/ConnectionQueryServices.java  |   6 +
 .../query/ConnectionQueryServicesImpl.java      |  44 ++-
 .../query/ConnectionlessQueryServicesImpl.java  |  24 +-
 .../query/DelegateConnectionQueryServices.java  |  12 +
 .../apache/phoenix/query/QueryConstants.java    | 121 ++-----
 .../org/apache/phoenix/query/QueryServices.java |   4 +
 .../phoenix/query/QueryServicesOptions.java     |   9 +-
 phoenix-flume/pom.xml                           |   6 -
 phoenix-kafka/pom.xml                           |   6 -
 pom.xml                                         |   6 +
 34 files changed, 1558 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index a537c96..3cb3682 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -345,12 +345,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>com.lmax</groupId>
-      <artifactId>disruptor</artifactId>
-      <version>3.2.0</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>org.cloudera.htrace</groupId>
       <artifactId>htrace-core</artifactId>
     </dependency>
@@ -479,5 +473,9 @@
       <artifactId>i18n-util</artifactId>
       <version>${i18n-util.version}</version>
     </dependency>
+       <dependency>
+        <groupId>com.lmax</groupId>
+        <artifactId>disruptor</artifactId>
+      </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
index e7187d6..627e453 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
@@ -16,6 +16,26 @@
  */
 package org.apache.phoenix.end2end;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
@@ -38,31 +58,15 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-
 @Category(NeedsOwnMiniClusterTest.class)
 public class MigrateSystemTablesToSystemNamespaceIT extends BaseTest {
 
     private static final Set<String> PHOENIX_SYSTEM_TABLES = new 
HashSet<>(Arrays.asList(
             "SYSTEM.CATALOG", "SYSTEM.SEQUENCE", "SYSTEM.STATS", 
"SYSTEM.FUNCTION",
-            "SYSTEM.MUTEX"));
+            "SYSTEM.MUTEX","SYSTEM.LOG"));
     private static final Set<String> PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES = 
new HashSet<>(
             Arrays.asList("SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS", 
"SYSTEM:FUNCTION",
-                    "SYSTEM:MUTEX"));
+                    "SYSTEM:MUTEX","SYSTEM:LOG"));
     private static final String SCHEMA_NAME = "MIGRATETEST";
     private static final String TABLE_NAME =
             SCHEMA_NAME + "." + 
MigrateSystemTablesToSystemNamespaceIT.class.getSimpleName().toUpperCase();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
index 7a2d3e1..c7c8ebf 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
@@ -166,6 +166,10 @@ public class QueryDatabaseMetaDataIT extends 
ParallelStatsDisabledIT {
             assertEquals(PTableType.SYSTEM.toString(), 
rs.getString("TABLE_TYPE"));
             assertTrue(rs.next());
             assertEquals(SYSTEM_CATALOG_SCHEMA, rs.getString("TABLE_SCHEM"));
+            assertEquals(PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE, 
rs.getString("TABLE_NAME"));
+            assertEquals(PTableType.SYSTEM.toString(), 
rs.getString("TABLE_TYPE"));
+            assertTrue(rs.next());
+            assertEquals(SYSTEM_CATALOG_SCHEMA, rs.getString("TABLE_SCHEM"));
             assertEquals(TYPE_SEQUENCE, rs.getString("TABLE_NAME"));
             assertEquals(PTableType.SYSTEM.toString(), 
rs.getString("TABLE_TYPE"));
             assertTrue(rs.next());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java
new file mode 100644
index 0000000..d6cc096
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryLoggerIT.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BIND_PARAMETERS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLIENT_IP;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXCEPTION_TRACE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXPLAIN_PLAN;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GLOBAL_SCAN_DETAILS;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NO_OF_RESULTS_ITERATED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.InetAddress;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.log.LogLevel;
+import org.apache.phoenix.log.QueryLogState;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class QueryLoggerIT extends BaseUniqueNamesOwnClusterIT {
+
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        // Enable request metric collection at the driver level
+        props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, 
String.valueOf(true));
+        // disable renewing leases as this will force spooling to happen.
+        props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+        // need the non-test driver for some tests that check number of 
hconnections, etc.
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+    } 
+    
+
+    @Test
+    public void testDebugLogs() throws Exception {
+        String tableName = generateUniqueName();
+        createTableAndInsertValues(tableName, true);
+        Properties props= new Properties();
+        props.setProperty(QueryServices.LOG_LEVEL, LogLevel.DEBUG.name());
+        Connection conn = DriverManager.getConnection(getUrl(),props);
+        
assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),LogLevel.DEBUG);
+        String query = "SELECT * FROM " + tableName;
+        ResultSet rs = conn.createStatement().executeQuery(query);
+        StatementContext context = ((PhoenixResultSet)rs).getContext();
+        String queryId = context.getQueryLogger().getQueryId();
+        while (rs.next()) {
+            rs.getString(1);
+            rs.getString(2);
+        }
+        ResultSet explainRS = conn.createStatement().executeQuery("Explain " + 
query);
+
+        String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_LOG_TABLE + "\"";
+        rs = conn.createStatement().executeQuery(logQuery);
+        boolean foundQueryLog = false;
+        int delay = 5000;
+
+        // sleep for sometime to let query log committed
+        Thread.sleep(delay);
+        while (rs.next()) {
+            if (rs.getString(QUERY_ID).equals(queryId)) {
+                foundQueryLog = true;
+                assertEquals(rs.getString(BIND_PARAMETERS), null);
+                assertEquals(rs.getString(USER), 
System.getProperty("user.name"));
+                assertEquals(rs.getString(CLIENT_IP), 
InetAddress.getLocalHost().getHostAddress());
+                assertEquals(rs.getString(EXPLAIN_PLAN), 
QueryUtil.getExplainPlan(explainRS));
+                assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), 
context.getScan().toJSON());
+                assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 10);
+                assertEquals(rs.getString(QUERY), query);
+                assertEquals(rs.getString(QUERY_STATUS), 
QueryLogState.COMPLETED.toString());
+                assertTrue(System.currentTimeMillis() - 
rs.getTimestamp(START_TIME).getTime() > delay);
+                assertEquals(rs.getString(TENANT_ID), null);
+                assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null);
+                assertEquals(rs.getString(EXCEPTION_TRACE),null);
+            }else{
+                //confirm we are not logging system queries
+                
assertFalse(rs.getString(QUERY).toString().contains(SYSTEM_CATALOG_SCHEMA));
+            }
+        }
+        assertTrue(foundQueryLog);
+        conn.close();
+    }
+    
+    @Test
+    public void testLogSampling() throws Exception {
+        String tableName = generateUniqueName();
+        createTableAndInsertValues(tableName, true);
+        Properties props= new Properties();
+        props.setProperty(QueryServices.LOG_LEVEL, LogLevel.DEBUG.name());
+        props.setProperty(QueryServices.LOG_SAMPLE_RATE, "0.5");
+        Connection conn = DriverManager.getConnection(getUrl(),props);
+        
assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),LogLevel.DEBUG);
+        String query = "SELECT * FROM " + tableName;
+        int count=100;
+        for (int i = 0; i < count; i++) {
+            conn.createStatement().executeQuery(query);
+        }
+        
+        String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_LOG_TABLE + "\"";
+        ResultSet rs = conn.createStatement().executeQuery(logQuery);
+        int delay = 5000;
+
+        // sleep for sometime to let query log committed
+        Thread.sleep(delay);
+        int logCount=0;
+        while (rs.next()) {
+            logCount++;
+        }
+        
+        //sampling rate is 0.5 , but with lesser count, uniformity of thread 
random may not be perfect, so taking 0.75 for comparison 
+        assertTrue(logCount != 0 && logCount < count * 0.75);
+        conn.close();
+    }
+    
+    @Test
+    public void testInfoLogs() throws Exception{
+        String tableName = generateUniqueName();
+        createTableAndInsertValues(tableName, true);
+        Properties props= new Properties();
+        props.setProperty(QueryServices.LOG_LEVEL, LogLevel.INFO.name());
+        Connection conn = DriverManager.getConnection(getUrl(),props);
+        
assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),LogLevel.INFO);
+        String query = "SELECT * FROM " + tableName;
+        
+        ResultSet rs = conn.createStatement().executeQuery(query);
+        StatementContext context = ((PhoenixResultSet)rs).getContext();
+        String queryId = context.getQueryLogger().getQueryId();
+        while (rs.next()) {
+            rs.getString(1);
+            rs.getString(2);
+        }
+
+        String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_LOG_TABLE + "\"";
+        rs = conn.createStatement().executeQuery(logQuery);
+        boolean foundQueryLog = false;
+        int delay = 5000;
+
+        // sleep for sometime to let query log committed
+        Thread.sleep(delay);
+        while (rs.next()) {
+            if (rs.getString(QUERY_ID).equals(queryId)) {
+                foundQueryLog = true;
+                assertEquals(rs.getString(USER), 
System.getProperty("user.name"));
+                assertEquals(rs.getString(CLIENT_IP), 
InetAddress.getLocalHost().getHostAddress());
+                assertEquals(rs.getString(EXPLAIN_PLAN), null);
+                assertEquals(rs.getString(GLOBAL_SCAN_DETAILS),null);
+                assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 0);
+                assertEquals(rs.getString(QUERY), query);
+                assertEquals(rs.getString(QUERY_STATUS),null);
+                assertTrue(System.currentTimeMillis() - 
rs.getTimestamp(START_TIME).getTime() > delay);
+                assertEquals(rs.getString(TENANT_ID), null);
+                assertTrue(rs.getString(TOTAL_EXECUTION_TIME) == null);
+            }
+        }
+        assertTrue(foundQueryLog);
+        conn.close();
+    }
+    
+    @Test
+    public void testWithLoggingOFF() throws Exception{
+        String tableName = generateUniqueName();
+        createTableAndInsertValues(tableName, true);
+        Properties props= new Properties();
+        props.setProperty(QueryServices.LOG_LEVEL, LogLevel.OFF.name());
+        Connection conn = DriverManager.getConnection(getUrl(),props);
+        
assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),LogLevel.OFF);
+        String query = "SELECT * FROM " + tableName;
+        
+        ResultSet rs = conn.createStatement().executeQuery(query);
+        StatementContext context = ((PhoenixResultSet)rs).getContext();
+        String queryId = context.getQueryLogger().getQueryId();
+        while (rs.next()) {
+            rs.getString(1);
+            rs.getString(2);
+        }
+
+        String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_LOG_TABLE + "\"";
+        rs = conn.createStatement().executeQuery(logQuery);
+        boolean foundQueryLog = false;
+        int delay = 5000;
+
+        // sleep for sometime to let query log committed
+        Thread.sleep(delay);
+        while (rs.next()) {
+            if (rs.getString(QUERY_ID).equals(queryId)) {
+                foundQueryLog = true;
+            }
+        }
+        assertFalse(foundQueryLog);
+        conn.close();
+    }
+    
+
+    @Test
+    public void testPreparedStatementWithTrace() throws Exception{
+        testPreparedStatement(LogLevel.TRACE);   
+    }
+    
+    @Test
+    public void testPreparedStatementWithDebug() throws Exception{
+        testPreparedStatement(LogLevel.DEBUG);
+    }
+            
+    private void testPreparedStatement(LogLevel loglevel) throws Exception{
+        String tableName = generateUniqueName();
+        createTableAndInsertValues(tableName, true);
+        Properties props= new Properties();
+        props.setProperty(QueryServices.LOG_LEVEL, loglevel.name());
+        Connection conn = DriverManager.getConnection(getUrl(),props);
+        
assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(),loglevel);
+        
+        String query = "SELECT * FROM " + tableName +" where V = ?";
+        
+        PreparedStatement pstmt = conn.prepareStatement(query);
+        pstmt.setString(1, "value5");
+        ResultSet rs = pstmt.executeQuery();
+        StatementContext context = ((PhoenixResultSet)rs).getContext();
+        String queryId = context.getQueryLogger().getQueryId();
+        while (rs.next()) {
+            rs.getString(1);
+            rs.getString(2);
+        }
+        ResultSet explainRS = conn.createStatement()
+                .executeQuery("Explain " + "SELECT * FROM " + tableName + " 
where V = 'value5'");
+        String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_LOG_TABLE + "\"";
+        rs = conn.createStatement().executeQuery(logQuery);
+        boolean foundQueryLog = false;
+        int delay = 5000;
+        
+        // sleep for sometime to let query log committed
+        Thread.sleep(delay);
+        while (rs.next()) {
+            if (rs.getString(QUERY_ID).equals(queryId)) {
+                foundQueryLog = true;
+                assertEquals(rs.getString(BIND_PARAMETERS), loglevel == 
LogLevel.TRACE ? "value5" : null);
+                assertEquals(rs.getString(USER), 
System.getProperty("user.name"));
+                assertEquals(rs.getString(CLIENT_IP), 
InetAddress.getLocalHost().getHostAddress());
+                assertEquals(rs.getString(EXPLAIN_PLAN), 
QueryUtil.getExplainPlan(explainRS));
+                assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), 
context.getScan().toJSON());
+                assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 1);
+                assertEquals(rs.getString(QUERY), query);
+                assertEquals(rs.getString(QUERY_STATUS), 
QueryLogState.COMPLETED.toString());
+                assertTrue(System.currentTimeMillis() - 
rs.getTimestamp(START_TIME).getTime() > delay);
+                assertEquals(rs.getString(TENANT_ID), null);
+                assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null);
+            }
+        }
+        assertTrue(foundQueryLog);
+        conn.close();
+    }
+    
+    
+    
+    @Test
+    public void testFailedQuery() throws Exception {
+        String tableName = generateUniqueName();
+        Properties props = new Properties();
+        props.setProperty(QueryServices.LOG_LEVEL, LogLevel.DEBUG.name());
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        assertEquals(conn.unwrap(PhoenixConnection.class).getLogLevel(), 
LogLevel.DEBUG);
+        // Table does not exists
+        String query = "SELECT * FROM " + tableName;
+
+        try {
+            conn.createStatement().executeQuery(query);
+            fail();
+        } catch (SQLException e) {
+            assertEquals(e.getErrorCode(), 
SQLExceptionCode.TABLE_UNDEFINED.getErrorCode());
+        }
+        String logQuery = "SELECT * FROM " + SYSTEM_CATALOG_SCHEMA + ".\"" + 
SYSTEM_LOG_TABLE + "\"";
+        ResultSet rs = conn.createStatement().executeQuery(logQuery);
+        boolean foundQueryLog = false;
+        int delay = 5000;
+
+        // sleep for sometime to let query log committed
+        Thread.sleep(delay);
+        while (rs.next()) {
+            if 
(QueryLogState.FAILED.name().equals(rs.getString(QUERY_STATUS))) {
+                foundQueryLog = true;
+                assertEquals(rs.getString(USER), 
System.getProperty("user.name"));
+                assertEquals(rs.getString(CLIENT_IP), 
InetAddress.getLocalHost().getHostAddress());
+                assertEquals(rs.getString(EXPLAIN_PLAN), null);
+                assertEquals(rs.getString(GLOBAL_SCAN_DETAILS), null);
+                assertEquals(rs.getLong(NO_OF_RESULTS_ITERATED), 0);
+                assertEquals(rs.getString(QUERY), query);
+                
assertTrue(rs.getString(EXCEPTION_TRACE).contains(SQLExceptionCode.TABLE_UNDEFINED.getMessage()));
+                assertTrue(System.currentTimeMillis() - 
rs.getTimestamp(START_TIME).getTime() > delay);
+                assertTrue(rs.getString(TOTAL_EXECUTION_TIME) != null);
+            }
+        }
+        assertTrue(foundQueryLog);
+        conn.close();
+    }
+
+    private static void createTableAndInsertValues(String tableName, boolean 
resetGlobalMetricsAfterTableCreate)
+            throws Exception {
+        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL 
PRIMARY KEY, V VARCHAR)";
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute(ddl);
+        // executing 10 upserts/mutations.
+        String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+        PreparedStatement stmt = conn.prepareStatement(dml);
+        for (int i = 1; i <= 10; i++) {
+            stmt.setString(1, "key" + i);
+            stmt.setString(2, "value" + i);
+            stmt.executeUpdate();
+        }
+        conn.commit();
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
index f8dfd65..34a1312 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
@@ -493,6 +493,8 @@ public class TenantSpecificTablesDDLIT extends 
BaseTenantSpecificTablesIT {
             assertTrue(rs.next());
             assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, 
SYSTEM_FUNCTION_TABLE, SYSTEM);
             assertTrue(rs.next());
+            assertTableMetaData(rs, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, 
PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE, PTableType.SYSTEM);
+            assertTrue(rs.next());
             assertTableMetaData(rs, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, 
PhoenixDatabaseMetaData.TYPE_SEQUENCE, PTableType.SYSTEM);
             assertTrue(rs.next());
             assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, 
PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE, PTableType.SYSTEM);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index 39d8525..c105046 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.log.QueryLogger;
 import org.apache.phoenix.monitoring.OverAllQueryMetrics;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.parse.SelectStatement;
@@ -83,6 +84,7 @@ public class StatementContext {
     private Map<SelectStatement, Object> subqueryResults;
     private final ReadMetricQueue readMetricsQueue;
     private final OverAllQueryMetrics overAllQueryMetrics;
+    private QueryLogger queryLogger;
     
     public StatementContext(PhoenixStatement statement) {
         this(statement, new Scan());
@@ -306,5 +308,13 @@ public class StatementContext {
     public OverAllQueryMetrics getOverallQueryMetrics() {
         return overAllQueryMetrics;
     }
+
+    public void setQueryLogger(QueryLogger queryLogger) {
+       this.queryLogger=queryLogger;
+    }
+
+    public QueryLogger getQueryLogger() {
+        return queryLogger;
+    }
     
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 7c9f478..bb3d447 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -72,11 +72,11 @@ import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.TableResultIterator;
 import org.apache.phoenix.iterate.TableResultIteratorFactory;
 import org.apache.phoenix.jdbc.PhoenixStatement.PhoenixStatementParser;
+import org.apache.phoenix.log.LogLevel;
 import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PSchema;
 import org.apache.phoenix.query.ConnectionQueryServices;
-import org.apache.phoenix.query.ConnectionQueryServices.Feature;
 import org.apache.phoenix.query.DelegateConnectionQueryServices;
 import org.apache.phoenix.query.MetaDataMutated;
 import org.apache.phoenix.query.PropertyPolicyProvider;
@@ -166,6 +166,8 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
     private final LinkedBlockingQueue<WeakReference<TableResultIterator>> 
scannerQueue;
     private TableResultIteratorFactory tableResultIteratorFactory;
     private boolean isRunningUpgrade;
+    private LogLevel logLevel;
+    private Double logSamplingRate;
 
     static {
         Tracing.addTraceMetricsSource();
@@ -370,6 +372,10 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
         this.scannerQueue = new LinkedBlockingQueue<>();
         this.tableResultIteratorFactory = new 
DefaultTableResultIteratorFactory();
         this.isRunningUpgrade = isRunningUpgrade;
+        this.logLevel= 
LogLevel.valueOf(this.services.getProps().get(QueryServices.LOG_LEVEL,
+                QueryServicesOptions.DEFAULT_LOGGING_LEVEL));
+        this.logSamplingRate = 
Double.parseDouble(this.services.getProps().get(QueryServices.LOG_SAMPLE_RATE,
+                QueryServicesOptions.DEFAULT_LOG_SAMPLE_RATE));
         GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();
     }
 
@@ -640,6 +646,7 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
             } finally {
                 services.removeConnection(this);
             }
+            
         } finally {
             isClosed = true;
             GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement();
@@ -1258,4 +1265,12 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
         this.isRunningUpgrade = isRunningUpgrade;
     }
 
+    public LogLevel getLogLevel(){
+        return this.logLevel;
+    }
+    
+    public Double getLogSamplingRate(){
+        return this.logSamplingRate;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 0cad455..4329b4b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -347,6 +347,24 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     public static final String USE_STATS_FOR_PARALLELIZATION = 
"USE_STATS_FOR_PARALLELIZATION";
     public static final byte[] USE_STATS_FOR_PARALLELIZATION_BYTES = 
Bytes.toBytes(USE_STATS_FOR_PARALLELIZATION);
 
+    
+    //SYSTEM:LOG
+    public static final String SYSTEM_LOG_TABLE = "LOG";
+    public static final String QUERY_ID = "QUERY_ID";
+    public static final String USER = "USER";
+    public static final String CLIENT_IP = "CLIENT_IP";
+    public static final String QUERY = "QUERY";
+    public static final String EXPLAIN_PLAN = "EXPLAIN_PLAN";
+    public static final String TOTAL_EXECUTION_TIME = "TOTAL_EXECUTION_TIME";
+    public static final String NO_OF_RESULTS_ITERATED = 
"NO_OF_RESULTS_ITERATED";
+    public static final String QUERY_STATUS = "QUERY_STATUS";
+    public static final String EXCEPTION_TRACE = "EXCEPTION_TRACE";
+    public static final String GLOBAL_SCAN_DETAILS = "GLOBAL_SCAN_DETAILS";
+    public static final String SCAN_METRICS_JSON = "SCAN_METRICS_JSON";
+    public static final String START_TIME = "START_TIME";
+    public static final String BIND_PARAMETERS = "BIND_PARAMETERS";
+            
+    
     PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException {
         this.emptyResultSet = new 
PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, 
new StatementContext(new PhoenixStatement(connection), false));
         this.connection = connection;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
index 71ecb8d..914ea33 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
@@ -169,7 +169,13 @@ public class PhoenixPreparedStatement extends 
PhoenixStatement implements Prepar
             throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
             .build().buildException();
         }
-        return execute(statement);
+        if (statement.getOperation().isMutation()) {
+            executeMutation(statement);
+            return false;
+        }
+        executeQuery(statement, createQueryLogger(statement,query));
+        return true;
+        
     }
 
     @Override
@@ -183,7 +189,8 @@ public class PhoenixPreparedStatement extends 
PhoenixStatement implements Prepar
         if (statement.getOperation().isMutation()) {
             throw new 
ExecuteQueryNotApplicableException(statement.getOperation());
         }
-        return executeQuery(statement);
+        
+        return executeQuery(statement,createQueryLogger(statement,query));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index d3ec151..909b59d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -50,6 +50,9 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.log.QueryLogInfo;
+import org.apache.phoenix.log.QueryLogState;
+import org.apache.phoenix.log.QueryLogger;
 import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.monitoring.OverAllQueryMetrics;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
@@ -72,6 +75,9 @@ import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.SQLCloseable;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
 
 
 
@@ -122,6 +128,14 @@ public class PhoenixResultSet implements ResultSet, 
SQLCloseable {
     private boolean isClosed = false;
     private boolean wasNull = false;
     private boolean firstRecordRead = false;
+
+    private QueryLogger queryLogger;
+
+    private Long count = 0L;
+
+    private QueryLogState logStatus = QueryLogState.COMPLETED;
+
+    private RuntimeException exception;
     
     public PhoenixResultSet(ResultIterator resultIterator, RowProjector 
rowProjector, StatementContext ctx) throws SQLException {
         this.rowProjector = rowProjector;
@@ -130,6 +144,7 @@ public class PhoenixResultSet implements ResultSet, 
SQLCloseable {
         this.statement = context.getStatement();
         this.readMetricsQueue = context.getReadMetricsQueue();
         this.overAllQueryMetrics = context.getOverallQueryMetrics();
+        this.queryLogger = context.getQueryLogger();
     }
     
     @Override
@@ -779,17 +794,37 @@ public class PhoenixResultSet implements ResultSet, 
SQLCloseable {
             currentRow = scanner.next();
             if (currentRow == null) {
                 close();
+            }else{
+                count++;
             }
             rowProjector.reset();
         } catch (RuntimeException e) {
+            this.logStatus=QueryLogState.FAILED;
             // FIXME: Expression.evaluate does not throw SQLException
             // so this will unwrap throws from that.
+            this.exception = e;
             if (e.getCause() instanceof SQLException) {
                 throw (SQLException) e.getCause();
             }
             throw e;
+        }finally{
+            if (currentRow == null && queryLogger != null ) {
+                if (queryLogger.isDebugEnabled()) {
+                    Builder<QueryLogInfo, Object> queryLogBuilder = 
ImmutableMap.builder();
+                    queryLogBuilder.put(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, 
count);
+                    queryLogBuilder.put(QueryLogInfo.TOTAL_EXECUTION_TIME_I,
+                            System.currentTimeMillis() - 
queryLogger.getStartTime());
+                    
+                    if (this.exception != null) {
+                        queryLogBuilder.put(QueryLogInfo.EXCEPTION_TRACE_I,
+                                
Throwables.getStackTraceAsString(this.exception));
+                    }
+                    queryLogger.log(logStatus, queryLogBuilder.build());
+                }
+            }
         }
         if (currentRow == null) {
+            
             overAllQueryMetrics.endQuery();
             overAllQueryMetrics.stopResultSetWatch();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 2bd563f..29edb7f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -92,6 +92,10 @@ import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.iterate.MaterializedResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.log.QueryLogInfo;
+import org.apache.phoenix.log.QueryLogState;
+import org.apache.phoenix.log.QueryLogger;
+import org.apache.phoenix.log.QueryLoggerUtil;
 import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.AddColumnStatement;
 import org.apache.phoenix.parse.AddJarsStatement;
@@ -184,6 +188,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.math.IntMath;
@@ -266,26 +272,19 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
         return new PhoenixResultSet(iterator, projector, context);
     }
     
-    protected boolean execute(final CompilableStatement stmt) throws 
SQLException {
-        if (stmt.getOperation().isMutation()) {
-            executeMutation(stmt);
-            return false;
-        }
-        executeQuery(stmt);
-        return true;
-    }
-    
     protected QueryPlan optimizeQuery(CompilableStatement stmt) throws 
SQLException {
         QueryPlan plan = stmt.compilePlan(this, 
Sequence.ValueOp.VALIDATE_SEQUENCE);
         return connection.getQueryServices().getOptimizer().optimize(this, 
plan);
     }
     
-    protected PhoenixResultSet executeQuery(final CompilableStatement stmt) 
throws SQLException {
-      return executeQuery(stmt,true);
+    protected PhoenixResultSet executeQuery(final CompilableStatement stmt, 
final QueryLogger queryLogger)
+            throws SQLException {
+        return executeQuery(stmt, true, queryLogger);
     }
     private PhoenixResultSet executeQuery(final CompilableStatement stmt,
-        final boolean doRetryOnMetaNotFoundError) throws SQLException {
+        final boolean doRetryOnMetaNotFoundError, final QueryLogger 
queryLogger) throws SQLException {
         GLOBAL_SELECT_SQL_COUNTER.increment();
+        
         try {
             return CallRunner.run(
                 new CallRunner.CallableThrowable<PhoenixResultSet, 
SQLException>() {
@@ -294,6 +293,7 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
                     final long startTime = System.currentTimeMillis();
                     try {
                         PhoenixConnection conn = getConnection();
+                        
                         if (conn.getQueryServices().isUpgradeRequired() && 
!conn.isRunningUpgrade()
                                 && stmt.getOperation() != Operation.UPGRADE) {
                             throw new UpgradeRequiredException();
@@ -314,6 +314,13 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
                             logger.debug(LogUtil.addCustomAnnotations("Explain 
plan: " + explainPlan, connection));
                         }
                         StatementContext context = plan.getContext();
+                        context.setQueryLogger(queryLogger);
+                        if(queryLogger.isDebugEnabled()){
+                            Builder<QueryLogInfo, Object> queryLogBuilder = 
ImmutableMap.builder();
+                            queryLogBuilder.put(QueryLogInfo.EXPLAIN_PLAN_I, 
QueryUtil.getExplainPlan(resultIterator));
+                            
queryLogBuilder.put(QueryLogInfo.GLOBAL_SCAN_DETAILS_I, 
context.getScan()!=null?context.getScan().toString():null);
+                            queryLogger.log(QueryLogState.COMPILED, 
queryLogBuilder.build());
+                        }
                         context.getOverallQueryMetrics().startQuery();
                         PhoenixResultSet rs = newResultSet(resultIterator, 
plan.getProjector(), plan.getContext());
                         resultSets.add(rs);
@@ -335,7 +342,8 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
                                 logger.debug("Reloading table "+ 
e.getTableName()+" data from server");
                             if(new 
MetaDataClient(connection).updateCache(connection.getTenantId(),
                                 e.getSchemaName(), e.getTableName(), 
true).wasUpdated()){
-                                return executeQuery(stmt, false);
+                                //TODO we can log retry count and error for 
debugging in LOG table
+                                return executeQuery(stmt, false, queryLogger);
                             }
                         }
                         throw e;
@@ -355,6 +363,13 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
                 }
                 }, PhoenixContextExecutor.inContext());
         }catch (Exception e) {
+            if (queryLogger.isDebugEnabled()) {
+                Builder<QueryLogInfo, Object> queryLogBuilder = 
ImmutableMap.builder();
+                queryLogBuilder.put(QueryLogInfo.TOTAL_EXECUTION_TIME_I,
+                        System.currentTimeMillis() - 
queryLogger.getStartTime());
+                queryLogBuilder.put(QueryLogInfo.EXCEPTION_TRACE_I, 
Throwables.getStackTraceAsString(e));
+                queryLogger.log(QueryLogState.FAILED, queryLogBuilder.build());
+            }
             Throwables.propagateIfInstanceOf(e, SQLException.class);
             Throwables.propagate(e);
             throw new IllegalStateException(); // Can't happen as 
Throwables.propagate() always throws
@@ -1664,16 +1679,37 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
         return compileMutation(stmt, sql);
     }
 
+    public QueryLogger createQueryLogger(CompilableStatement stmt, String sql) 
throws SQLException {
+        boolean isSystemTable=false;
+        if(stmt instanceof ExecutableSelectStatement){
+            TableNode from = ((ExecutableSelectStatement)stmt).getFrom();
+            if(from instanceof NamedTableNode){
+                String schemaName = 
((NamedTableNode)from).getName().getSchemaName();
+                if(schemaName==null){
+                    schemaName=connection.getSchema();
+                }
+                
if(PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA.equals(schemaName)){
+                    isSystemTable=true;
+                }
+            }
+        }
+        QueryLogger queryLogger = 
QueryLogger.getInstance(connection,isSystemTable);
+        QueryLoggerUtil.logInitialDetails(queryLogger, 
connection.getTenantId(),
+                connection.getQueryServices(), sql, 
queryLogger.getStartTime(), getParameters());
+        return queryLogger;
+    }
+    
     @Override
     public ResultSet executeQuery(String sql) throws SQLException {
         if (logger.isDebugEnabled()) {
             logger.debug(LogUtil.addCustomAnnotations("Execute query: " + sql, 
connection));
         }
+        
         CompilableStatement stmt = parseStatement(sql);
         if (stmt.getOperation().isMutation()) {
             throw new ExecuteQueryNotApplicableException(sql);
         }
-        return executeQuery(stmt);
+        return executeQuery(stmt,createQueryLogger(stmt,sql));
     }
 
     @Override
@@ -1709,7 +1745,8 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
             flushIfNecessary();
             return false;
         }
-        executeQuery(stmt);
+        
+        executeQuery(stmt,createQueryLogger(stmt,sql));
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java
new file mode 100644
index 0000000..5792658
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/LogLevel.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+public enum LogLevel {
+    OFF, INFO, DEBUG, TRACE
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java
new file mode 100644
index 0000000..817f9ec
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/LogWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+/**
+ * Used by the event handler to write RingBufferEvent, this is done in a 
seperate thread from the application configured
+ * during disruptor
+ */
+public interface LogWriter {
+    /**
+     * Called by ring buffer event handler to write RingBufferEvent
+     * 
+     * @param event
+     * @throws SQLException
+     * @throws IOException
+     */
+    void write(RingBufferEvent event) throws SQLException, IOException;
+
+    /**
+     * will be called when disruptor is getting shutdown
+     * 
+     * @throws IOException
+     */
+
+    void close() throws IOException;
+
+    /**
+     * if writer is closed and cannot write further event
+     * 
+     * @return
+     */
+    boolean isClosed();
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsEventHandler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsEventHandler.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsEventHandler.java
new file mode 100644
index 0000000..ee6b2d6
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsEventHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.lmax.disruptor.LifecycleAware;
+import com.lmax.disruptor.Sequence;
+import com.lmax.disruptor.SequenceReportingEventHandler;
+
+
+public class QueryLogDetailsEventHandler implements 
SequenceReportingEventHandler<RingBufferEvent>, LifecycleAware {
+    private Sequence sequenceCallback;
+    private LogWriter logWriter;
+
+    public QueryLogDetailsEventHandler(Configuration configuration) throws 
SQLException{
+        this.logWriter = new TableLogWriter(configuration);
+    }
+    
+    @Override
+    public void setSequenceCallback(final Sequence sequenceCallback) {
+        this.sequenceCallback = sequenceCallback;
+    }
+
+    @Override
+    public void onEvent(final RingBufferEvent event, final long sequence, 
final boolean endOfBatch) throws Exception {
+        logWriter.write(event);
+        event.clear();
+    }
+
+    @Override
+    public void onStart() {
+    }
+
+    @Override
+    public void onShutdown() {
+        try {
+            if (logWriter != null) {
+                logWriter.close();
+            }
+        } catch (Exception e) {
+            //Ignore
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java
new file mode 100644
index 0000000..87de267
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogInfo.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BIND_PARAMETERS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLIENT_IP;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXCEPTION_TRACE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXPLAIN_PLAN;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GLOBAL_SCAN_DETAILS;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NO_OF_RESULTS_ITERATED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCAN_METRICS_JSON;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER;
+
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PTimestamp;
+import org.apache.phoenix.schema.types.PVarchar;
+
+
+public enum QueryLogInfo {
+    
+    CLIENT_IP_I(CLIENT_IP, QueryLogState.STARTED, LogLevel.INFO, 
PVarchar.INSTANCE),
+    QUERY_I(QUERY,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE),
+    BIND_PARAMETERS_I(BIND_PARAMETERS,QueryLogState.STARTED, 
LogLevel.TRACE,PVarchar.INSTANCE),
+    QUERY_ID_I(QUERY_ID,QueryLogState.STARTED, 
LogLevel.INFO,PVarchar.INSTANCE),
+    TENANT_ID_I(TENANT_ID,QueryLogState.STARTED, 
LogLevel.INFO,PVarchar.INSTANCE),
+    START_TIME_I(START_TIME,QueryLogState.STARTED, 
LogLevel.INFO,PTimestamp.INSTANCE),
+    USER_I(USER,QueryLogState.STARTED, LogLevel.INFO,PVarchar.INSTANCE),
+    EXPLAIN_PLAN_I(EXPLAIN_PLAN,QueryLogState.COMPILED, 
LogLevel.DEBUG,PVarchar.INSTANCE),
+    GLOBAL_SCAN_DETAILS_I(GLOBAL_SCAN_DETAILS,QueryLogState.COMPILED, 
LogLevel.DEBUG,PVarchar.INSTANCE),
+    NO_OF_RESULTS_ITERATED_I(NO_OF_RESULTS_ITERATED,QueryLogState.COMPLETED, 
LogLevel.DEBUG,PLong.INSTANCE),
+    EXCEPTION_TRACE_I(EXCEPTION_TRACE,QueryLogState.COMPLETED, 
LogLevel.DEBUG,PVarchar.INSTANCE),
+    QUERY_STATUS_I(QUERY_STATUS,QueryLogState.COMPLETED, 
LogLevel.DEBUG,PVarchar.INSTANCE),
+    TOTAL_EXECUTION_TIME_I(TOTAL_EXECUTION_TIME,QueryLogState.COMPLETED, 
LogLevel.DEBUG,PLong.INSTANCE),
+    SCAN_METRICS_JSON_I(SCAN_METRICS_JSON,QueryLogState.COMPLETED, 
LogLevel.DEBUG,PVarchar.INSTANCE);
+    
+    public final String columnName;
+    public final QueryLogState logState;
+    public final LogLevel logLevel;
+    public final PDataType dataType;
+
+    private QueryLogInfo(String columnName, QueryLogState logState, LogLevel 
logLevel, PDataType dataType) {
+        this.columnName = columnName;
+        this.logState=logState;
+        this.logLevel=logLevel;
+        this.dataType=dataType;
+    }
+
+    public String getColumnName() {
+        return columnName;
+    }
+
+    public QueryLogState getLogState() {
+        return logState;
+    }
+
+    public LogLevel getLogLevel() {
+        return logLevel;
+    }
+
+    public PDataType getDataType() {
+        return dataType;
+    }
+    
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java
new file mode 100644
index 0000000..e27f0e8
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogState.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+public enum QueryLogState {
+    STARTED, PLAN, COMPILED, EXECUTION, COMPLETED,FAILED 
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
new file mode 100644
index 0000000..b2fb235
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+
+import com.google.common.collect.ImmutableMap;
+
+import io.netty.util.internal.ThreadLocalRandom;
+
+/*
+ * Wrapper for query translator
+ */
+public class QueryLogger {
+    private final ThreadLocal<RingBufferEventTranslator> threadLocalTranslator 
= new ThreadLocal<>();
+    private QueryLoggerDisruptor queryDisruptor;
+    private String queryId;
+    private Long startTime;
+    private LogLevel logLevel;
+    private static final Log LOG = 
LogFactory.getLog(QueryLoggerDisruptor.class);
+    
+    private QueryLogger(PhoenixConnection connection) {
+        this.queryId = UUID.randomUUID().toString();
+        this.queryDisruptor = 
connection.getQueryServices().getQueryDisruptor();
+        this.startTime = System.currentTimeMillis();
+        logLevel = connection.getLogLevel();
+    }
+    
+    private QueryLogger() {
+        logLevel = LogLevel.OFF;
+    }
+    
+    private RingBufferEventTranslator getCachedTranslator() {
+        RingBufferEventTranslator result = threadLocalTranslator.get();
+        if (result == null) {
+            result = new RingBufferEventTranslator(queryId);
+            threadLocalTranslator.set(result);
+        }
+        return result;
+    }
+    
+    private static final QueryLogger NO_OP_INSTANCE = new QueryLogger() {
+        @Override
+        public void log(QueryLogState logState, ImmutableMap<QueryLogInfo, 
Object> map) {
+
+        }
+        
+        @Override
+        public boolean isDebugEnabled(){
+            return false;
+        }
+        
+        @Override
+        public boolean isInfoEnabled(){
+            return false;
+        }
+    };
+
+    public static QueryLogger getInstance(PhoenixConnection connection, 
boolean isSystemTable) {
+        if (connection.getLogLevel() == LogLevel.OFF || isSystemTable || 
ThreadLocalRandom.current()
+                .nextDouble() > connection.getLogSamplingRate()) { return 
NO_OP_INSTANCE; }
+        return new QueryLogger(connection);
+    }
+
+    /**
+     * Add query log in the table, columns will be logged depending upon the 
connection logLevel
+     * @param logState State of the query
+     * @param map Value of the map should be in format of the corresponding 
data type 
+     */
+    public void log(QueryLogState logState, ImmutableMap<QueryLogInfo, Object> 
map) {
+        final RingBufferEventTranslator translator = getCachedTranslator();
+        translator.setQueryInfo(logState, map, logLevel);
+        publishLogs(translator);
+    }
+    
+    private boolean publishLogs(RingBufferEventTranslator translator) {
+        if (queryDisruptor == null) { return false; }
+        boolean isLogged = queryDisruptor.tryPublish(translator);
+        if (!isLogged && LOG.isDebugEnabled()) {
+            LOG.debug("Unable to write query log in table as ring buffer queue 
is full!!");
+        }
+        return isLogged;
+    }
+
+    /**
+     * Start time when the logger was started, if {@link LogLevel#OFF} then 
it's the current time
+     */
+    public Long getStartTime() {
+        return startTime != null ? startTime : System.currentTimeMillis();
+    }
+    
+    /**
+     *  Is debug logging currently enabled?
+     *  Call this method to prevent having to perform expensive operations 
(for example, String concatenation) when the log level is more than debug.
+     */
+    public boolean isDebugEnabled(){
+        return isLevelEnabled(LogLevel.DEBUG);
+    }
+    
+    private boolean isLevelEnabled(LogLevel logLevel){
+        return this.logLevel != null ? logLevel.ordinal() <= 
this.logLevel.ordinal() : false;
+    }
+    
+    /**
+     * Is Info logging currently enabled?
+     * Call this method to prevent having to perform expensive operations (for 
example, String concatenation) when the log level is more than info.
+     * @return
+     */
+    public boolean isInfoEnabled(){
+        return isLevelEnabled(LogLevel.INFO);
+    }
+
+    /**
+     * Return queryId of the current query logger , needed by the application 
+     * to correlate with the logging table.
+     * Eg(usage):-
+     * StatementContext context = ((PhoenixResultSet)rs).getContext();
+     * String queryId = context.getQueryLogger().getQueryId();
+     * 
+     * @return
+     */
+    public String getQueryId() {
+        return this.queryId;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDefaultExceptionHandler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDefaultExceptionHandler.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDefaultExceptionHandler.java
new file mode 100644
index 0000000..e9ae6bd
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDefaultExceptionHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import com.lmax.disruptor.ExceptionHandler;
+
+class QueryLoggerDefaultExceptionHandler implements 
ExceptionHandler<RingBufferEvent> {
+
+    @Override
+    public void handleEventException(Throwable ex, long sequence, 
RingBufferEvent event) {
+        final StringBuilder sb = new StringBuilder(512);
+        sb.append("Query Logger error handling event 
seq=").append(sequence).append(", value='");
+        try {
+            sb.append(event);
+        } catch (final Exception ignored) {
+            sb.append("[ERROR calling 
").append(event.getClass()).append(".toString(): ");
+            sb.append(ignored).append("]");
+        }
+        sb.append("':");
+        System.err.println(sb);
+        ex.printStackTrace();
+    }
+
+    @Override
+    public void handleOnStartException(final Throwable throwable) {
+        System.err.println("QueryLogger error starting:");
+        throwable.printStackTrace();
+    }
+
+    @Override
+    public void handleOnShutdownException(final Throwable throwable) {
+        System.err.println("QueryLogger error shutting down:");
+        throwable.printStackTrace();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java
new file mode 100644
index 0000000..b548d6c
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.query.QueryServices;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.TimeoutException;
+import com.lmax.disruptor.WaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+
+public class QueryLoggerDisruptor implements Closeable{
+    
+    private volatile Disruptor<RingBufferEvent> disruptor;
+    private boolean isClosed = false;
+    //number of elements to create within the ring buffer.
+    private static final int RING_BUFFER_SIZE = 256 * 1024;
+    private static final Log LOG = 
LogFactory.getLog(QueryLoggerDisruptor.class);
+    private static final String DEFAULT_WAIT_STRATEGY = 
BlockingWaitStrategy.class.getName();
+    
+    public QueryLoggerDisruptor(Configuration configuration) throws 
SQLException{
+        WaitStrategy waitStrategy;
+        try {
+            waitStrategy = (WaitStrategy)Class
+                    
.forName(configuration.get(QueryServices.LOG_BUFFER_WAIT_STRATEGY, 
DEFAULT_WAIT_STRATEGY)).newInstance();
+        } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException e) {
+            throw new SQLException(e); 
+        }
+        
+        ThreadFactory threadFactory = new ThreadFactoryBuilder()
+                .setNameFormat("QueryLogger" + "-thread-%s")
+                .setDaemon(true)
+                .setThreadFactory(new ThreadFactory() {
+                    @Override
+                    public Thread newThread(Runnable r) {
+                        final Thread result = 
Executors.defaultThreadFactory().newThread(r);
+                        
result.setContextClassLoader(QueryLoggerDisruptor.class.getClass().getClassLoader());
+                        return result;
+                    }
+                })
+                .build();
+        disruptor = new Disruptor<RingBufferEvent>(RingBufferEvent.FACTORY,
+                configuration.getInt(QueryServices.LOG_BUFFER_SIZE, 
RING_BUFFER_SIZE), threadFactory, ProducerType.MULTI,
+                waitStrategy);
+        final ExceptionHandler<RingBufferEvent> errorHandler = new 
QueryLoggerDefaultExceptionHandler();
+        disruptor.setDefaultExceptionHandler(errorHandler);
+
+        final QueryLogDetailsEventHandler[] handlers = { new 
QueryLogDetailsEventHandler(configuration) };
+        disruptor.handleEventsWith(handlers);
+        LOG.info("Starting  QueryLoggerDisruptor for with ringbufferSize=" + 
disruptor.getRingBuffer().getBufferSize()
+                + ", waitStrategy=" + waitStrategy.getClass().getSimpleName() 
+ ", " + "exceptionHandler="
+                + errorHandler + "...");
+        disruptor.start();
+        
+    }
+    
+    /**
+     * Attempts to publish an event by translating (write) data 
representations into events claimed from the RingBuffer.
+     * @param translator
+     * @return
+     */
+    public boolean tryPublish(final EventTranslator<RingBufferEvent> 
translator) {
+        if(isClosed()){
+            return false;
+        }
+        return disruptor.getRingBuffer().tryPublishEvent(translator);
+    }
+    
+
+    public boolean isClosed() {
+        return isClosed ;
+    }
+
+    @Override
+    public void close() throws IOException {
+        isClosed = true;
+        LOG.info("Shutting down QueryLoggerDisruptor..");
+        try {
+            //we can wait for 2 seconds, so that backlog can be committed
+            disruptor.shutdown(2, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            throw new IOException(e);
+        }
+
+    }
+    
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java
new file mode 100644
index 0000000..2f22931
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerUtil.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.schema.PName;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+
+public class QueryLoggerUtil {
+
+    public static void logInitialDetails(QueryLogger queryLogger, PName 
tenantId,
+            ConnectionQueryServices queryServices, String query, long 
startTime, List<Object> bindParameters) {
+        queryLogger.log(QueryLogState.STARTED,
+                getInitialDetails(tenantId, queryServices, query, startTime, 
bindParameters));
+
+    }
+
+    private static ImmutableMap<QueryLogInfo, Object> getInitialDetails(PName 
tenantId,
+            ConnectionQueryServices queryServices, String query, long 
startTime, List<Object> bindParameters) {
+        Builder<QueryLogInfo, Object> queryLogBuilder = ImmutableMap.builder();
+        String clientIP;
+        try {
+            clientIP = InetAddress.getLocalHost().getHostAddress();
+        } catch (UnknownHostException e) {
+            clientIP = "UnknownHost";
+        }
+        queryLogBuilder.put(QueryLogInfo.CLIENT_IP_I, clientIP);
+        queryLogBuilder.put(QueryLogInfo.QUERY_I, query);
+        queryLogBuilder.put(QueryLogInfo.START_TIME_I, startTime);
+        if (bindParameters != null) {
+            queryLogBuilder.put(QueryLogInfo.BIND_PARAMETERS_I, 
StringUtils.join(bindParameters,","));
+        }
+        if (tenantId != null) {
+            queryLogBuilder.put(QueryLogInfo.TENANT_ID_I, 
tenantId.getString());
+        }
+        queryLogBuilder.put(QueryLogInfo.USER_I, queryServices.getUserName() 
!= null ? queryServices.getUserName()
+                : queryServices.getUser().getShortName());
+        return queryLogBuilder.build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java
new file mode 100644
index 0000000..96e4bf9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEvent.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import com.google.common.collect.ImmutableMap;
+import com.lmax.disruptor.EventFactory;
+
+ class RingBufferEvent {
+    private String queryId;
+    private QueryLogState logState;
+    private LogLevel connectionLogLevel;
+    private ImmutableMap<QueryLogInfo, Object> queryInfo;
+    
+    public static final Factory FACTORY = new Factory();
+    
+    /**
+     * Creates the events that will be put in the RingBuffer.
+     */
+    private static class Factory implements EventFactory<RingBufferEvent> {
+        @Override
+        public RingBufferEvent newInstance() {
+            final RingBufferEvent result = new RingBufferEvent();
+            return result;
+        }
+    }
+
+    public void clear() {
+        this.logState=null;
+        this.queryInfo=null;
+        this.queryId=null;
+    }
+
+   
+    public String getQueryId() {
+        return queryId;
+    }
+
+    public static Factory getFactory() {
+        return FACTORY;
+    }
+    
+    public QueryLogState getLogState() {
+        return logState;
+    }
+
+    public void setQueryInfo(ImmutableMap<QueryLogInfo, Object> queryInfo) {
+        this.queryInfo=queryInfo;
+        
+    }
+
+    public void setQueryId(String queryId) {
+        this.queryId=queryId;
+        
+    }
+
+    public ImmutableMap<QueryLogInfo, Object> getQueryInfo() {
+        return queryInfo;
+        
+    }
+
+    public void setLogState(QueryLogState logState) {
+        this.logState=logState;
+        
+    }
+
+
+    public LogLevel getConnectionLogLevel() {
+        return connectionLogLevel;
+    }
+
+
+    public void setConnectionLogLevel(LogLevel connectionLogLevel) {
+        this.connectionLogLevel = connectionLogLevel;
+    }
+
+    
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
new file mode 100644
index 0000000..653ddd6
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/log/RingBufferEventTranslator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import com.google.common.collect.ImmutableMap;
+import com.lmax.disruptor.EventTranslator;
+
+class RingBufferEventTranslator implements EventTranslator<RingBufferEvent> {
+    private String queryId;
+    private QueryLogState logState;
+    private ImmutableMap<QueryLogInfo, Object> queryInfo;
+    private LogLevel connectionLogLevel;
+    
+    public RingBufferEventTranslator(String queryId) {
+        this.queryId=queryId;
+    }
+
+    @Override
+    public void translateTo(RingBufferEvent event, long sequence) {
+        event.setQueryId(queryId);
+        event.setQueryInfo(queryInfo);
+        event.setLogState(logState);
+        event.setConnectionLogLevel(connectionLogLevel);
+        clear();
+    }
+
+    private void clear() {
+        setQueryInfo(null,null,null);
+    }
+   
+    public void setQueryInfo(QueryLogState logState, 
ImmutableMap<QueryLogInfo, Object> queryInfo,
+            LogLevel connectionLogLevel) {
+        this.queryInfo = queryInfo;
+        this.logState = logState;
+        this.connectionLogLevel = connectionLogLevel;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/28d74053/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
new file mode 100644
index 0000000..4398596
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.Determinism;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.query.HBaseFactoryProvider;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Writes RingBuffer log event into table 
+ * 
+ */
+public class TableLogWriter implements LogWriter {
+    private static final Log LOG = LogFactory.getLog(LogWriter.class);
+    private HConnection connection;
+    private boolean isClosed;
+    private HTableInterface table;
+    private Configuration config;
+
+    public TableLogWriter(Configuration configuration) {
+        this.config = configuration;
+        try {
+            this.connection = 
HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config);
+            table = HBaseFactoryProvider.getHTableFactory()
+                    .getTable(SchemaUtil
+                            .getPhysicalTableName(
+                                    
SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_SCHEMA, SYSTEM_LOG_TABLE), config)
+                            .getName(), connection, null);
+        } catch (Exception e) {
+            LOG.warn("Unable to initiate LogWriter for writing query logs to 
table");
+        }
+    }
+
+    @Override
+    public void write(RingBufferEvent event) throws SQLException, IOException {
+        if(isClosed()){
+            LOG.warn("Unable to commit query log as Log committer is already 
closed");
+            return;
+        }
+        if (table == null || connection == null) {
+            LOG.warn("Unable to commit query log as connection was not 
initiated ");
+            return;
+        }
+        ImmutableMap<QueryLogInfo, Object> queryInfo=event.getQueryInfo();
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        Put put =new Put(Bytes.toBytes(event.getQueryId()));
+        for (Entry<QueryLogInfo, Object> entry : queryInfo.entrySet()) {
+            if (entry.getKey().logLevel.ordinal() <= 
event.getConnectionLogLevel().ordinal()) {
+                LiteralExpression expression = 
LiteralExpression.newConstant(entry.getValue(), entry.getKey().dataType,
+                        Determinism.ALWAYS);
+                expression.evaluate(null, ptr);
+                put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes(entry.getKey().columnName),
+                        ByteUtil.copyKeyBytesIfNecessary(ptr));
+            }
+        }
+        
+        if (QueryLogInfo.QUERY_STATUS_I.logLevel.ordinal() <= 
event.getConnectionLogLevel().ordinal()
+                && (event.getLogState() == QueryLogState.COMPLETED || 
event.getLogState() == QueryLogState.FAILED)) {
+            LiteralExpression expression = 
LiteralExpression.newConstant(event.getLogState().toString(),
+                    QueryLogInfo.QUERY_STATUS_I.dataType, Determinism.ALWAYS);
+            expression.evaluate(null, ptr);
+            put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                    Bytes.toBytes(QueryLogInfo.QUERY_STATUS_I.columnName), 
ByteUtil.copyKeyBytesIfNecessary(ptr));
+        }
+        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+        table.put(put);
+        
+    }
+    
+    @Override
+    public void close() throws IOException {
+        if(isClosed()){
+            return;
+        }
+        isClosed=true;
+        try {
+            if (table != null) {
+                table.close();
+            }
+            if (connection != null && !connection.isClosed()) {
+                //It should internally close all the statements
+                connection.close();
+            }
+        } catch (IOException e) {
+            // TODO Ignore?
+        }
+    }
+    
+    public boolean isClosed(){
+        return isClosed;
+    }
+
+}

Reply via email to