Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.2 aec5101d9 -> 5e3d432d9


http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e3d432d/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
new file mode 100644
index 0000000..c102855
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.log;
+
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.Determinism;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Writes RingBuffer log event into table 
+ * 
+ */
+public class TableLogWriter implements LogWriter {
+    private static final Log LOG = LogFactory.getLog(LogWriter.class);
+    private Connection connection;
+    private boolean isClosed;
+    private Table table;
+    private Configuration config;
+
+    public TableLogWriter(Configuration configuration) {
+        this.config = configuration;
+        try {
+            this.connection = 
ConnectionFactory.createConnection(configuration);
+            table = this.connection.getTable(SchemaUtil.getPhysicalTableName(
+                    SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_SCHEMA, 
SYSTEM_LOG_TABLE), config));
+        } catch (Exception e) {
+            LOG.warn("Unable to initiate LogWriter for writing query logs to 
table");
+        }
+    }
+
+    @Override
+    public void write(RingBufferEvent event) throws SQLException, IOException {
+        if(isClosed()){
+            LOG.warn("Unable to commit query log as Log committer is already 
closed");
+            return;
+        }
+        if (table == null || connection == null) {
+            LOG.warn("Unable to commit query log as connection was not 
initiated ");
+            return;
+        }
+        ImmutableMap<QueryLogInfo, Object> queryInfo=event.getQueryInfo();
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        Put put =new Put(Bytes.toBytes(event.getQueryId()));
+        for (Entry<QueryLogInfo, Object> entry : queryInfo.entrySet()) {
+            if (entry.getKey().logLevel.ordinal() <= 
event.getConnectionLogLevel().ordinal()) {
+                LiteralExpression expression = 
LiteralExpression.newConstant(entry.getValue(), entry.getKey().dataType,
+                        Determinism.ALWAYS);
+                expression.evaluate(null, ptr);
+                put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
Bytes.toBytes(entry.getKey().columnName),
+                        ByteUtil.copyKeyBytesIfNecessary(ptr));
+            }
+        }
+        
+        if (QueryLogInfo.QUERY_STATUS_I.logLevel.ordinal() <= 
event.getConnectionLogLevel().ordinal()
+                && (event.getLogState() == QueryLogState.COMPLETED || 
event.getLogState() == QueryLogState.FAILED)) {
+            LiteralExpression expression = 
LiteralExpression.newConstant(event.getLogState().toString(),
+                    QueryLogInfo.QUERY_STATUS_I.dataType, Determinism.ALWAYS);
+            expression.evaluate(null, ptr);
+            put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+                    Bytes.toBytes(QueryLogInfo.QUERY_STATUS_I.columnName), 
ByteUtil.copyKeyBytesIfNecessary(ptr));
+        }
+        put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+        table.put(put);
+        
+    }
+    
+    @Override
+    public void close() throws IOException {
+        if(isClosed()){
+            return;
+        }
+        isClosed=true;
+        try {
+            if (table != null) {
+                table.close();
+            }
+            if (connection != null && !connection.isClosed()) {
+                //It should internally close all the statements
+                connection.close();
+            }
+        } catch (IOException e) {
+            // TODO Ignore?
+        }
+    }
+    
+    public boolean isClosed(){
+        return isClosed;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e3d432d/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
index 4fd1194..c008635 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java
@@ -1,27 +1,21 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the 
License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.phoenix.monitoring;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
@@ -44,6 +38,8 @@ public class ReadMetricQueue {
 
     private final ConcurrentMap<MetricKey, Queue<CombinableMetric>> metricsMap 
= new ConcurrentHashMap<>();
 
+    private final List<ScanMetricsHolder> scanMetricsHolderList = new 
ArrayList<ScanMetricsHolder>();
+
     private final boolean isRequestMetricsEnabled;
 
     public ReadMetricQueue(boolean isRequestMetricsEnabled) {
@@ -85,7 +81,7 @@ public class ReadMetricQueue {
         }
         return publishedMetrics;
     }
-    
+
     public void clearMetrics() {
         metricsMap.clear(); // help gc
     }
@@ -177,8 +173,18 @@ public class ReadMetricQueue {
         return q;
     }
 
-       public boolean isRequestMetricsEnabled() {
-               return isRequestMetricsEnabled;
-       }
+    public boolean isRequestMetricsEnabled() {
+        return isRequestMetricsEnabled;
+    }
+    
+    public void addScanHolder(ScanMetricsHolder holder){
+        scanMetricsHolderList.add(holder);
+    }
+
+    public List<ScanMetricsHolder> getScanMetricsHolderList() {
+        return scanMetricsHolderList;
+    }
+    
+    
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e3d432d/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
index 6bcd402..9125cd8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
@@ -17,20 +17,23 @@
  */
 package org.apache.phoenix.monitoring;
 
-import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_CALLS;
-import static org.apache.phoenix.monitoring.MetricType.COUNT_REMOTE_RPC_CALLS;
+import static 
org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_IN_REMOTE_RESULTS;
+import static 
org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_REGION_SERVER_RESULTS;
 import static 
org.apache.phoenix.monitoring.MetricType.COUNT_MILLS_BETWEEN_NEXTS;
 import static 
org.apache.phoenix.monitoring.MetricType.COUNT_NOT_SERVING_REGION_EXCEPTION;
-import static 
org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_REGION_SERVER_RESULTS;
-import static 
org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_IN_REMOTE_RESULTS;
+import static org.apache.phoenix.monitoring.MetricType.COUNT_REMOTE_RPC_CALLS;
+import static 
org.apache.phoenix.monitoring.MetricType.COUNT_REMOTE_RPC_RETRIES;
+import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_FILTERED;
+import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_SCANNED;
+import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_CALLS;
+import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_RETRIES;
 import static org.apache.phoenix.monitoring.MetricType.COUNT_SCANNED_REGIONS;
 
-import org.apache.hadoop.hbase.client.Scan;
+import java.io.IOException;
+import java.util.Map;
 
-import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_RETRIES;
-import static 
org.apache.phoenix.monitoring.MetricType.COUNT_REMOTE_RPC_RETRIES;
-import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_SCANNED;
-import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_FILTERED;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.JsonMapper;
 
 public class ScanMetricsHolder {
 
@@ -45,9 +48,11 @@ public class ScanMetricsHolder {
     private final CombinableMetric countOfRemoteRPCRetries;
     private final CombinableMetric countOfRowsScanned;
     private final CombinableMetric countOfRowsFiltered;
+    private  Map<String, Long> scanMetricMap;
+    private Object scan;
 
     private static final ScanMetricsHolder NO_OP_INSTANCE =
-            new ScanMetricsHolder(new ReadMetricQueue(false), "");
+            new ScanMetricsHolder(new ReadMetricQueue(false), "",null);
 
     public static ScanMetricsHolder getInstance(ReadMetricQueue readMetrics, 
String tableName,
             Scan scan, boolean isRequestMetricsEnabled) {
@@ -55,10 +60,12 @@ public class ScanMetricsHolder {
             return NO_OP_INSTANCE;
         }
         scan.setScanMetricsEnabled(true);
-        return new ScanMetricsHolder(readMetrics, tableName);
+        return new ScanMetricsHolder(readMetrics, tableName, scan);
     }
 
-    private ScanMetricsHolder(ReadMetricQueue readMetrics, String tableName) {
+    private ScanMetricsHolder(ReadMetricQueue readMetrics, String 
tableName,Scan scan) {
+        readMetrics.addScanHolder(this);
+        this.scan=scan;
         countOfRPCcalls = readMetrics.allotMetric(COUNT_RPC_CALLS, tableName);
         countOfRemoteRPCcalls = 
readMetrics.allotMetric(COUNT_REMOTE_RPC_CALLS, tableName);
         sumOfMillisSecBetweenNexts = 
readMetrics.allotMetric(COUNT_MILLS_BETWEEN_NEXTS, tableName);
@@ -118,4 +125,21 @@ public class ScanMetricsHolder {
         return countOfRowsScanned;
     }
 
+    public Map<String, Long> getScanMetricMap() {
+        return scanMetricMap;
+    }
+
+    public void setScanMetricMap(Map<String, Long> scanMetricMap) {
+        this.scanMetricMap = scanMetricMap;
+    }
+    
+    @Override
+    public String toString() {
+        try {
+            return "{\"scan\":" + scan + ", \"scanMetrics\":" + 
JsonMapper.writeObjectAsString(scanMetricMap) + "}";
+        } catch (IOException e) {
+            return "{\"Exception while converting scan metrics to Json\":\"" + 
e.getMessage() + "\"}";
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e3d432d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 90f8089..0b72ada 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -30,12 +30,14 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.log.QueryLoggerDisruptor;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PName;
@@ -146,4 +148,8 @@ public interface ConnectionQueryServices extends 
QueryServices, MetaDataMutated
     void upgradeSystemTables(String url, Properties props) throws SQLException;
     
     public Configuration getConfiguration();
+
+    public User getUser();
+
+    public QueryLoggerDisruptor getQueryDisruptor();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e3d432d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 6df2f80..8c7441a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -124,6 +124,7 @@ import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -183,6 +184,7 @@ import 
org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.log.QueryLoggerDisruptor;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PSchema;
 import org.apache.phoenix.protobuf.ProtobufUtil;
@@ -267,6 +269,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     // don't need.
     private final ReadOnlyProps props;
     private final String userName;
+    private final User user;
     private final 
ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices;
     private final GuidePostsCache tableStatsCache;
 
@@ -336,6 +339,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                     return hbaseVersion >= 
PhoenixDatabaseMetaData.MIN_RENEW_LEASE_VERSION;
                 }
             });
+    private QueryLoggerDisruptor queryDisruptor;
 
     private PMetaData newEmptyMetaData() {
         return new PSynchronizedMetaData(new 
PMetaDataImpl(INITIAL_META_DATA_TABLE_CAPACITY, getProps()));
@@ -372,6 +376,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         ConfigUtil.setReplicationConfigIfAbsent(this.config);
         this.props = new ReadOnlyProps(this.config.iterator());
         this.userName = connectionInfo.getPrincipal();
+        this.user = connectionInfo.getUser();
         this.latestMetaData = newEmptyMetaData();
         // TODO: should we track connection wide memory usage or just org-wide 
usage?
         // If connection-wide, create a MemoryManager here, otherwise just use 
the one from the delegate
@@ -396,6 +401,12 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         this.maxConnectionsAllowed = 
config.getInt(QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS,
             
QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS);
         this.shouldThrottleNumConnections = (maxConnectionsAllowed > 0);
+        try {
+            this.queryDisruptor = new QueryLoggerDisruptor(this.config);
+        } catch (SQLException e) {
+            logger.warn("Unable to initiate qeuery logging service !!");
+            e.printStackTrace();
+        }
 
     }
 
@@ -477,6 +488,13 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             }
             closed = true;
             GLOBAL_QUERY_SERVICES_COUNTER.decrement();
+            try {
+                if (this.queryDisruptor != null) {
+                    this.queryDisruptor.close();
+                }
+            } catch (Exception e) {
+                // Ignore
+            }
             SQLException sqlE = null;
             try {
                 // Attempt to return any unused sequences.
@@ -2611,7 +2629,9 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         try {
             
metaConnection.createStatement().execute(QueryConstants.CREATE_FUNCTION_METADATA);
         } catch (TableAlreadyExistsException ignore) {}
-
+        try {
+            
metaConnection.createStatement().execute(QueryConstants.CREATE_LOG_METADATA);
+        } catch (TableAlreadyExistsException ignore) {}
         // Catch the IOException to log the error message and then bubble it 
up for the client to retry.
         try {
             createSysMutexTableIfNotExists(hbaseAdmin, 
ConnectionQueryServicesImpl.this.getProps());
@@ -2966,6 +2986,9 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             try {
                 
metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_FUNCTION_METADATA);
             } catch (NewerTableAlreadyExistsException e) {} catch 
(TableAlreadyExistsException e) {}
+            try {
+                
metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_LOG_METADATA);
+            } catch (NewerTableAlreadyExistsException e) {} catch 
(TableAlreadyExistsException e) {}
             ConnectionQueryServicesImpl.this.upgradeRequired.set(false);
             success = true;
         } catch (UpgradeInProgressException | UpgradeNotRequiredException e) {
@@ -4063,6 +4086,11 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     public String getUserName() {
         return userName;
     }
+    
+    @Override
+    public User getUser() {
+        return user;
+    }
 
     private void checkClosed() {
         if (closed) {
@@ -4488,4 +4516,9 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     public Configuration getConfiguration() {
         return config;
     }
+
+    @Override
+    public QueryLoggerDisruptor getQueryDisruptor() {
+        return this.queryDisruptor;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e3d432d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index c510b5a..ad354d1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -55,6 +56,7 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.log.QueryLoggerDisruptor;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PSchema;
 import org.apache.phoenix.schema.FunctionNotFoundException;
@@ -112,10 +114,13 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
     private final Map<String, List<HRegionLocation>> tableSplits = 
Maps.newHashMap();
     private final GuidePostsCache guidePostsCache;
     private final Configuration config;
+
+    private User user;
     
     public ConnectionlessQueryServicesImpl(QueryServices services, 
ConnectionInfo connInfo, Properties info) {
         super(services);
         userName = connInfo.getPrincipal();
+        user = connInfo.getUser();
         metaData = newEmptyMetaData();
 
         // Use KeyValueBuilder that builds real KeyValues, as our test utils 
require this
@@ -328,6 +333,9 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
                    
metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_FUNCTION_METADATA);
                 } catch (NewerTableAlreadyExistsException ignore) {
                 }
+                try {
+                    
metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_LOG_METADATA);
+                } catch (NewerTableAlreadyExistsException ignore) {}
             } catch (SQLException e) {
                 sqlE = e;
             } finally {
@@ -664,4 +672,14 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
     public Configuration getConfiguration() {
         return config;
     }
+
+    @Override
+    public User getUser() {
+        return user;
+    }
+
+    @Override
+    public QueryLoggerDisruptor getQueryDisruptor() {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e3d432d/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 05d1af6..f5c8a59 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -30,12 +30,14 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.log.QueryLoggerDisruptor;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PSchema;
 import org.apache.phoenix.schema.PColumn;
@@ -351,4 +353,16 @@ public class DelegateConnectionQueryServices extends 
DelegateQueryServices imple
     public Configuration getConfiguration() {
         return getDelegate().getConfiguration();
     }
+
+    @Override
+    public User getUser() {
+        return getDelegate().getUser();
+    }
+
+    @Override
+    public QueryLoggerDisruptor getQueryDisruptor() {
+        return getDelegate().getQueryDisruptor();
+    }
+    
+    
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e3d432d/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 7607388..ae12e01 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -109,6 +109,20 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BIND_PARAMETERS;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLIENT_IP;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXCEPTION_TRACE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.EXPLAIN_PLAN;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GLOBAL_SCAN_DETAILS;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NO_OF_RESULTS_ITERATED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_ID;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCAN_METRICS_JSON;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TOTAL_EXECUTION_TIME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USER;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE;
 
 import java.math.BigDecimal;
 
@@ -124,6 +138,7 @@ import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableProperty;
 
 
 /**
@@ -395,10 +410,40 @@ public interface QueryConstants {
             // Install split policy to prevent a tenant's metadata from being 
split across regions.
             HTableDescriptor.SPLIT_POLICY + "='" + 
MetaDataSplitPolicy.class.getName() + "',\n" + 
             PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
+    
+    public static final String CREATE_LOG_METADATA =
+            "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_LOG_TABLE 
+ "\"(\n" +
+             // Pk columns
+            TENANT_ID + " VARCHAR ," +
+            QUERY_ID + " VARCHAR NOT NULL,\n" +
+            USER + " VARCHAR , \n" +
+            CLIENT_IP + " VARCHAR, \n" +
+            // Function metadata (will be null for argument row)
+            QUERY +  " VARCHAR, \n" +
+            EXPLAIN_PLAN + " VARCHAR, \n" +
+            // Argument metadata (will be null for function row)
+            START_TIME + " TIMESTAMP, \n" +
+            TOTAL_EXECUTION_TIME + " BIGINT, \n" +
+            NO_OF_RESULTS_ITERATED + " BIGINT, \n" +
+            QUERY_STATUS + " VARCHAR, \n" +
+            EXCEPTION_TRACE + " VARCHAR, \n" +
+            GLOBAL_SCAN_DETAILS + " VARCHAR, \n" +
+            BIND_PARAMETERS + " VARCHAR, \n" +
+            SCAN_METRICS_JSON + " VARCHAR, \n" +
+            " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY 
(QUERY_ID))\n" +
+            HConstants.VERSIONS + "= " + MetaDataProtocol.DEFAULT_LOG_VERSIONS 
+ ",\n" +
+            HColumnDescriptor.KEEP_DELETED_CELLS + "="  + 
MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n"+
+            // Install split policy to prevent a tenant's metadata from being 
split across regions.
+            HTableDescriptor.SPLIT_POLICY + "='" + 
MetaDataSplitPolicy.class.getName() + "',\n" + 
+            PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE+ ",\n" 
+ 
+            HColumnDescriptor.TTL + "=" + 
MetaDataProtocol.DEFAULT_LOG_TTL+",\n"+
+            TableProperty.COLUMN_ENCODED_BYTES.toString()+" = 0";
+    
     public static final byte[] OFFSET_FAMILY = "f_offset".getBytes();
     public static final byte[] OFFSET_COLUMN = "c_offset".getBytes();
     public static final String LAST_SCAN = "LAST_SCAN";
     public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes();
     public static final String HASH_JOIN_CACHE_RETRIES = 
"hashjoin.client.retries.number";
     public static final int DEFAULT_HASH_JOIN_CACHE_RETRIES = 5;
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e3d432d/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 0b18aaa..43b9e5a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -301,6 +301,10 @@ public interface QueryServices extends SQLCloseable {
     // Whether to enable cost-based-decision in the query optimizer
     public static final String COST_BASED_OPTIMIZER_ENABLED = 
"phoenix.costbased.optimizer.enabled";
     public static final String SMALL_SCAN_THRESHOLD_ATTRIB = 
"phoenix.query.smallScanThreshold";
+    public static final String LOG_LEVEL = "phoenix.log.level";
+    public static final String LOG_BUFFER_SIZE = "phoenix.log.buffer.size";
+    public static final String LOG_BUFFER_WAIT_STRATEGY = 
"phoenix.log.wait.strategy";
+    public static final String LOG_SAMPLE_RATE = "phoenix.log.sample.rate";
 
     /**
      * Get executor service used for parallel scans

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e3d432d/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 961ab9f..58c9812 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -47,6 +47,8 @@ import static 
org.apache.phoenix.query.QueryServices.IS_NAMESPACE_MAPPING_ENABLE
 import static 
org.apache.phoenix.query.QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE;
 import static org.apache.phoenix.query.QueryServices.KEEP_ALIVE_MS_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.LOG_LEVEL;
+import static org.apache.phoenix.query.QueryServices.LOG_SAMPLE_RATE;
 import static org.apache.phoenix.query.QueryServices.MASTER_INFO_PORT_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_PERC_ATTRIB;
@@ -107,6 +109,7 @@ import org.apache.hadoop.hbase.client.Consistency;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.phoenix.log.LogLevel;
 import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTableRefFactory;
@@ -347,6 +350,8 @@ public class QueryServicesOptions {
     public static final boolean DEFAULT_ENABLE_SERVER_UPSERT_SELECT = false;
 
     public static final boolean DEFAULT_COST_BASED_OPTIMIZER_ENABLED = false;
+    public static final String DEFAULT_LOGGING_LEVEL = LogLevel.OFF.name();
+    public static final String DEFAULT_LOG_SAMPLE_RATE = "1.0";
 
     private final Configuration config;
 
@@ -428,7 +433,9 @@ public class QueryServicesOptions {
             .setIfUnset(USE_STATS_FOR_PARALLELIZATION, 
DEFAULT_USE_STATS_FOR_PARALLELIZATION)
             .setIfUnset(COST_BASED_OPTIMIZER_ENABLED, 
DEFAULT_COST_BASED_OPTIMIZER_ENABLED)
             .setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, 
DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING)
-            .setIfUnset(PHOENIX_ACLS_ENABLED,  DEFAULT_PHOENIX_ACLS_ENABLED);
+            .setIfUnset(PHOENIX_ACLS_ENABLED,  DEFAULT_PHOENIX_ACLS_ENABLED)
+            .setIfUnset(LOG_LEVEL,  DEFAULT_LOGGING_LEVEL)
+            .setIfUnset(LOG_SAMPLE_RATE,  DEFAULT_LOG_SAMPLE_RATE);
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user 
set
         // it to 1, so we'll change it.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e3d432d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 16de885..4d6084c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,7 @@
     <jackson.version>1.9.2</jackson.version>
     <antlr.version>3.5.2</antlr.version>
     <log4j.version>1.2.17</log4j.version>
+    <disruptor.version>3.3.6</disruptor.version>
     <slf4j.version>1.6.4</slf4j.version>
     <protobuf-java.version>2.5.0</protobuf-java.version>
     <commons-io.version>2.1</commons-io.version>
@@ -952,6 +953,11 @@
         <artifactId>javax.servlet-api</artifactId>
         <version>${servlet.api.version}</version>
       </dependency>
+       <dependency>
+        <groupId>com.lmax</groupId>
+        <artifactId>disruptor</artifactId>
+        <version>${disruptor.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 

Reply via email to