Repository: phoenix
Updated Branches:
  refs/heads/master 1c3e9495d -> 05ff5618d


PHOENIX-2430 Update stats table asynchronously


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/05ff5618
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/05ff5618
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/05ff5618

Branch: refs/heads/master
Commit: 05ff5618d4e630ed5e5c8160a3dc4628c0b7f5d6
Parents: 1c3e949
Author: Samarth <samarth.j...@salesforce.com>
Authored: Tue Dec 22 12:08:40 2015 -0800
Committer: Samarth <samarth.j...@salesforce.com>
Committed: Tue Dec 22 12:08:40 2015 -0800

----------------------------------------------------------------------
 .../StatisticsCollectionRunTrackerIT.java       | 170 ++++++++++
 .../org/apache/phoenix/cache/GlobalCache.java   |   6 +-
 .../coprocessor/BaseScannerRegionObserver.java  |   3 +-
 .../UngroupedAggregateRegionObserver.java       | 324 ++++++++++++++-----
 .../org/apache/phoenix/query/QueryServices.java |   3 +
 .../phoenix/query/QueryServicesOptions.java     |  17 +
 .../apache/phoenix/schema/MetaDataClient.java   |   8 +-
 .../stats/StatisticsCollectionRunTracker.java   | 130 ++++++++
 .../schema/stats/StatisticsCollector.java       |   8 +-
 .../phoenix/schema/stats/StatisticsScanner.java | 115 ++++---
 .../phoenix/schema/stats/StatisticsWriter.java  |   3 -
 .../phoenix/query/QueryServicesTestImpl.java    |   6 +-
 12 files changed, 654 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java
new file mode 100644
index 0000000..bf567f0
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static 
org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT;
+import static 
org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CONCURRENT_UPDATE_STATS_ROW_COUNT;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class StatisticsCollectionRunTrackerIT extends 
BaseOwnClusterHBaseManagedTimeIT {
+    private static final StatisticsCollectionRunTracker tracker = 
StatisticsCollectionRunTracker
+            .getInstance(new Configuration());
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Must update config before starting server
+        props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, 
Long.toString(20));
+        props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1024));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+    
+    @Test
+    public void testStateBeforeAndAfterUpdateStatsCommand() throws Exception {
+        String tableName = 
"testStateBeforeAndAfterUpdateStatsCommand".toUpperCase();
+        HRegionInfo regionInfo = createTableAndGetRegion(tableName);
+        StatisticsCollectionRunTracker tracker =
+                StatisticsCollectionRunTracker.getInstance(new 
Configuration());
+        // assert that the region wasn't added to the tracker
+        assertTrue(tracker.addUpdateStatsCommandRegion(regionInfo));
+        // assert that removing the region from the tracker works
+        assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo));
+        runUpdateStats(tableName);
+        // assert that after update stats is complete, tracker isn't tracking 
the region any more
+        assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo));
+    }
+    
+    @Test
+    public void testStateBeforeAndAfterMajorCompaction() throws Exception {
+        String tableName = 
"testStateBeforeAndAfterMajorCompaction".toUpperCase();
+        HRegionInfo regionInfo = createTableAndGetRegion(tableName);
+        StatisticsCollectionRunTracker tracker =
+                StatisticsCollectionRunTracker.getInstance(new 
Configuration());
+        // Upsert values in the table.
+        String keyPrefix = "aaaaaaaaaaaaaaaaaaaa";
+        String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            PreparedStatement stmt = conn.prepareStatement(upsert);
+            for (int i = 0; i < 1000; i++) {
+                stmt.setString(1, keyPrefix + i);
+                stmt.setString(2, "KV" + i);
+                stmt.executeUpdate();
+            }
+            conn.commit();
+        }
+        // assert that the region wasn't added to the tracker
+        assertTrue(tracker.addCompactingRegion(regionInfo));
+        // assert that removing the region from the tracker works
+        assertTrue(tracker.removeCompactingRegion(regionInfo));
+        runMajorCompaction(tableName);
+        
+        // assert that after major compaction is complete, tracker isn't 
tracking the region any more
+        assertFalse(tracker.removeCompactingRegion(regionInfo));
+    }
+    
+    @Test
+    public void testMajorCompactionPreventsUpdateStatsFromRunning() throws 
Exception {
+        String tableName = 
"testMajorCompactionPreventsUpdateStatsFromRunning".toUpperCase();
+        HRegionInfo regionInfo = createTableAndGetRegion(tableName);
+        // simulate stats collection via major compaction by marking the 
region as compacting in the tracker
+        markRegionAsCompacting(regionInfo);
+        Assert.assertEquals("Row count didn't match", 
COMPACTION_UPDATE_STATS_ROW_COUNT, runUpdateStats(tableName));
+        StatisticsCollectionRunTracker tracker =
+                StatisticsCollectionRunTracker.getInstance(new 
Configuration());
+        // assert that the tracker state was cleared.
+        assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo));
+    }
+    
+    @Test
+    public void testUpdateStatsPreventsAnotherUpdateStatsFromRunning() throws 
Exception {
+        String tableName = 
"testUpdateStatsPreventsAnotherUpdateStatsFromRunning".toUpperCase();
+        HRegionInfo regionInfo = createTableAndGetRegion(tableName);
+        markRunningUpdateStats(regionInfo);
+        Assert.assertEquals("Row count didn't match", 
CONCURRENT_UPDATE_STATS_ROW_COUNT,
+            runUpdateStats(tableName));
+        
+        // assert that running the concurrent and race-losing update stats 
didn't clear the region
+        // from the tracker. If the method returned true it means the tracker 
was still tracking
+        // the region. Slightly counter-intuitive, yes.
+        assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo));
+    }
+    
+    private void markRegionAsCompacting(HRegionInfo regionInfo) {
+        StatisticsCollectionRunTracker tracker =
+                StatisticsCollectionRunTracker.getInstance(new 
Configuration());
+        tracker.addCompactingRegion(regionInfo);
+    }
+
+    private void markRunningUpdateStats(HRegionInfo regionInfo) {
+        StatisticsCollectionRunTracker tracker =
+                StatisticsCollectionRunTracker.getInstance(new 
Configuration());
+        tracker.addUpdateStatsCommandRegion(regionInfo);
+    }
+
+    private HRegionInfo createTableAndGetRegion(String tableName) throws 
Exception {
+        byte[] tableNameBytes = Bytes.toBytes(tableName);
+        String ddl = "CREATE TABLE " + tableName + " (PK1 VARCHAR PRIMARY KEY, 
KV1 VARCHAR)";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute(ddl);
+            PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
+            try (HBaseAdmin admin = phxConn.getQueryServices().getAdmin()) {
+                List<HRegionInfo> tableRegions = 
admin.getTableRegions(tableNameBytes);
+                return tableRegions.get(0);
+            }
+        }
+    }
+    
+    private long runUpdateStats(String tableName) throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            return conn.createStatement().executeUpdate("UPDATE STATISTICS " + 
tableName);
+        }
+    }
+    
+    private void runMajorCompaction(String tableName) throws Exception {
+        try (PhoenixConnection conn = 
DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class)) {
+            try (HBaseAdmin admin = conn.getQueryServices().getAdmin()) {
+                TableName t = TableName.valueOf(tableName);
+                admin.flush(t);
+                admin.majorCompact(t);
+                Thread.sleep(10000);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
index 643112d..0a0de89 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
@@ -32,11 +32,9 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.memory.ChildMemoryManager;
 import org.apache.phoenix.memory.GlobalMemoryManager;
-import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PMetaDataEntity;
-import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.SizedUtil;
 
 import com.google.common.cache.Cache;
@@ -53,13 +51,13 @@ import com.google.common.cache.Weigher;
  * @since 0.1
  */
 public class GlobalCache extends TenantCacheImpl {
-    private static GlobalCache INSTANCE; 
+    private static volatile GlobalCache INSTANCE; 
     
     private final Configuration config;
     // TODO: Use Guava cache with auto removal after lack of access 
     private final ConcurrentMap<ImmutableBytesWritable,TenantCache> 
perTenantCacheMap = new ConcurrentHashMap<ImmutableBytesWritable,TenantCache>();
     // Cache for lastest PTable for a given Phoenix table
-    private Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache;
+    private volatile Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache;
     
     public void clearTenantCache() {
         perTenantCacheMap.clear();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index d720806..236c2dc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -93,6 +93,7 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
     public static final String GUIDEPOST_PER_REGION = "_GUIDEPOST_PER_REGION";
     public static final String UPGRADE_DESC_ROW_KEY = "_UPGRADE_DESC_ROW_KEY";
     public static final String SCAN_REGION_SERVER = "_SCAN_REGION_SERVER";
+    public static final String RUN_UPDATE_STATS_ASYNC = "_RunUpdateStatsAsync";
     
     /**
      * Attribute name used to pass custom annotations in Scans and Mutations 
(later). Custom annotations
@@ -170,7 +171,7 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
             if (!isRegionObserverFor(scan)) {
                 return s;
             }
-            boolean success =false;
+            boolean success = false;
             // Save the current span. When done with the child span, reset the 
span back to
             // what it was. Otherwise, this causes the thread local storing 
the current span
             // to not be reset back to null causing catastrophic infinite loops

http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index bd21e25..2bd9a5e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -21,7 +21,11 @@ import static 
org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_COMMIT_STATS_ASYNC;
+import static 
org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT;
+import static 
org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CONCURRENT_UPDATE_STATS_ROW_COUNT;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -35,7 +39,11 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
 
+import co.cask.tephra.TxConstants;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
@@ -88,6 +96,7 @@ import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.ValueSchema.Field;
+import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
 import org.apache.phoenix.schema.stats.StatisticsCollector;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.types.PBinary;
@@ -95,6 +104,7 @@ import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
+import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
@@ -108,8 +118,7 @@ import org.apache.phoenix.util.TimeKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import co.cask.tephra.TxConstants;
-
+import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -120,7 +129,7 @@ import com.google.common.collect.Sets;
  *
  * @since 0.1
  */
-public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
+public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
     // TODO: move all constants into a single class
     public static final String UNGROUPED_AGG = "UngroupedAgg";
     public static final String DELETE_AGG = "DeleteAgg";
@@ -173,14 +182,19 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
 
     @Override
     protected RegionScanner doPostScannerOpen(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final 
RegionScanner s) throws IOException {
-        Region region = c.getEnvironment().getRegion();
+        RegionCoprocessorEnvironment env = c.getEnvironment();
+        Region region = env.getRegion();
         long ts = scan.getTimeRange().getMax();
-        StatisticsCollector stats = null;
-        if(ScanUtil.isAnalyzeTable(scan)) {
-            byte[] gp_width_bytes = 
scan.getAttribute(BaseScannerRegionObserver.GUIDEPOST_WIDTH_BYTES);
-            byte[] gp_per_region_bytes = 
scan.getAttribute(BaseScannerRegionObserver.GUIDEPOST_PER_REGION);
+        if (ScanUtil.isAnalyzeTable(scan)) {
+            byte[] gp_width_bytes =
+                    
scan.getAttribute(BaseScannerRegionObserver.GUIDEPOST_WIDTH_BYTES);
+            byte[] gp_per_region_bytes =
+                    
scan.getAttribute(BaseScannerRegionObserver.GUIDEPOST_PER_REGION);
             // Let this throw, as this scan is being done for the sole purpose 
of collecting stats
-            stats = new StatisticsCollector(c.getEnvironment(), 
region.getRegionInfo().getTable().getNameAsString(), ts, gp_width_bytes, 
gp_per_region_bytes);
+            StatisticsCollector statsCollector =
+                    new StatisticsCollector(env, 
region.getRegionInfo().getTable()
+                            .getNameAsString(), ts, gp_width_bytes, 
gp_per_region_bytes);
+            return collectStats(s, statsCollector, region, scan, 
env.getConfiguration());
         }
         int offsetToBe = 0;
         if (ScanUtil.isLocalIndex(scan)) {
@@ -212,9 +226,9 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
         byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
         List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? 
null : IndexMaintainer.deserialize(localIndexBytes);
         List<Mutation> indexMutations = localIndexBytes == null ? 
Collections.<Mutation>emptyList() : 
Lists.<Mutation>newArrayListWithExpectedSize(1024);
-
+        
         RegionScanner theScanner = s;
-
+        
         byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
         List<Expression> selectExpressions = null;
         byte[] upsertSelectTable = 
scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE);
@@ -248,41 +262,39 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
         if ((localIndexScan && !isDelete && !isDescRowKeyOrderUpgrade) || (j 
== null && p != null)) {
             if (dataColumns != null) {
                 tupleProjector = IndexUtil.getTupleProjector(scan, 
dataColumns);
-                dataRegion = IndexUtil.getDataRegion(c.getEnvironment());
+                dataRegion = IndexUtil.getDataRegion(env);
                 viewConstants = 
IndexUtil.deserializeViewConstantsFromScan(scan);
             }
             ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
             theScanner =
-                    getWrappedScanner(c, theScanner, offset, scan, 
dataColumns, tupleProjector,
+                    getWrappedScanner(c, theScanner, offset, scan, 
dataColumns, tupleProjector, 
                             dataRegion, indexMaintainers == null ? null : 
indexMaintainers.get(0), viewConstants, p, tempPtr);
-        }
-
+        } 
+        
         if (j != null)  {
-            theScanner = new HashJoinRegionScanner(theScanner, p, j, 
ScanUtil.getTenantId(scan), c.getEnvironment());
+            theScanner = new HashJoinRegionScanner(theScanner, p, j, 
ScanUtil.getTenantId(scan), env);
         }
-
+        
         int batchSize = 0;
         List<Mutation> mutations = Collections.emptyList();
         boolean buildLocalIndex = indexMaintainers != null && 
dataColumns==null && !localIndexScan;
         if (isDescRowKeyOrderUpgrade || isDelete || isUpsert || (deleteCQ != 
null && deleteCF != null) || emptyCF != null || buildLocalIndex) {
             // TODO: size better
             mutations = Lists.newArrayListWithExpectedSize(1024);
-            batchSize = 
c.getEnvironment().getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+            batchSize = 
env.getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
         }
         Aggregators aggregators = ServerAggregators.deserialize(
-                scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), 
c.getEnvironment().getConfiguration());
+                scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), 
env.getConfiguration());
         Aggregator[] rowAggregators = aggregators.getAggregators();
         boolean hasMore;
         boolean hasAny = false;
         MultiKeyValueTuple result = new MultiKeyValueTuple();
         if (logger.isDebugEnabled()) {
-               logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped 
coprocessor scan " + scan + " "+region.getRegionInfo(), 
ScanUtil.getCustomAnnotations(scan)));
+            logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped 
coprocessor scan " + scan + " "+region.getRegionInfo(), 
ScanUtil.getCustomAnnotations(scan)));
         }
         long rowCount = 0;
         final RegionScanner innerScanner = theScanner;
         region.startRegionOperation();
-        boolean updateStats = stats != null;
-        boolean success = false;
         try {
             synchronized (innerScanner) {
                 do {
@@ -291,9 +303,6 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                     // since this is an indication of whether or not there are 
more values after the
                     // ones returned
                     hasMore = innerScanner.nextRaw(results);
-                    if (updateStats) {
-                        stats.collectStatistics(results);
-                    }
                     if (!results.isEmpty()) {
                         rowCount++;
                         result.setKeyValues(results);
@@ -313,8 +322,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                                         // Special case for re-writing DESC 
ARRAY, as the actual byte value needs to change in this case
                                         if (field.getDataType().isArrayType()) 
{
                                             
field.getDataType().coerceBytes(ptr, null, field.getDataType(),
-                                                    field.getMaxLength(), 
field.getScale(), field.getSortOrder(), 
-                                                    field.getMaxLength(), 
field.getScale(), field.getSortOrder(), true); // force to use correct 
separator byte
+                                                field.getMaxLength(), 
field.getScale(), field.getSortOrder(), 
+                                                field.getMaxLength(), 
field.getScale(), field.getSortOrder(), true); // force to use correct 
separator byte
                                         }
                                         // Special case for re-writing DESC 
CHAR or DESC BINARY, to force the re-writing of trailing space characters
                                         else if (field.getDataType() == 
PChar.INSTANCE || field.getDataType() == PBinary.INSTANCE) {
@@ -323,7 +332,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                                                 len--;
                                             }
                                             ptr.set(ptr.get(), 
ptr.getOffset(), len);
-                                        // Special case for re-writing DESC 
FLOAT and DOUBLE, as they're not inverted like they should be (PHOENIX-2171)
+                                            // Special case for re-writing 
DESC FLOAT and DOUBLE, as they're not inverted like they should be 
(PHOENIX-2171)
                                         } else if (field.getDataType() == 
PFloat.INSTANCE || field.getDataType() == PDouble.INSTANCE) {
                                             byte[] invertedBytes = 
SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength());
                                             ptr.set(invertedBytes);
@@ -342,8 +351,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                                 }
                                 writeToTable.newKey(ptr, values);
                                 if (Bytes.compareTo(
-                                        firstKV.getRowArray(), 
firstKV.getRowOffset() + offset, firstKV.getRowLength(), 
-                                        ptr.get(),ptr.getOffset() + 
offset,ptr.getLength()) == 0) {
+                                    firstKV.getRowArray(), 
firstKV.getRowOffset() + offset, firstKV.getRowLength(), 
+                                    ptr.get(),ptr.getOffset() + 
offset,ptr.getLength()) == 0) {
                                     continue;
                                 }
                                 byte[] newRow = 
ByteUtil.copyKeyBytesIfNecessary(ptr);
@@ -357,21 +366,21 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                                 for (Cell cell : results) {
                                     // Copy existing cell but with new row key
                                     Cell newCell = new KeyValue(newRow, 0, 
newRow.length,
-                                            cell.getFamilyArray(), 
cell.getFamilyOffset(), cell.getFamilyLength(),
-                                            cell.getQualifierArray(), 
cell.getQualifierOffset(), cell.getQualifierLength(),
-                                            cell.getTimestamp(), 
KeyValue.Type.codeToType(cell.getTypeByte()),
-                                            cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength());
+                                        cell.getFamilyArray(), 
cell.getFamilyOffset(), cell.getFamilyLength(),
+                                        cell.getQualifierArray(), 
cell.getQualifierOffset(), cell.getQualifierLength(),
+                                        cell.getTimestamp(), 
KeyValue.Type.codeToType(cell.getTypeByte()),
+                                        cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength());
                                     switch 
(KeyValue.Type.codeToType(cell.getTypeByte())) {
                                     case Put:
                                         // If Put, point delete old Put
                                         Delete del = new Delete(oldRow);
                                         del.addDeleteMarker(new 
KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
-                                                cell.getFamilyArray(), 
cell.getFamilyOffset(), cell.getFamilyLength(),
-                                                cell.getQualifierArray(), 
cell.getQualifierOffset(),
-                                                cell.getQualifierLength(), 
cell.getTimestamp(), KeyValue.Type.Delete,
-                                                ByteUtil.EMPTY_BYTE_ARRAY, 0, 
0));
+                                            cell.getFamilyArray(), 
cell.getFamilyOffset(), cell.getFamilyLength(),
+                                            cell.getQualifierArray(), 
cell.getQualifierOffset(),
+                                            cell.getQualifierLength(), 
cell.getTimestamp(), KeyValue.Type.Delete,
+                                            ByteUtil.EMPTY_BYTE_ARRAY, 0, 0));
                                         mutations.add(del);
-                                        
+
                                         Put put = new Put(newRow);
                                         put.add(newCell);
                                         mutations.add(put);
@@ -391,13 +400,13 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                                     if (!results.isEmpty()) {
                                         result.getKey(ptr);
                                         ValueGetter valueGetter =
-                                            
maintainer.createGetterFromKeyValues(
-                                                
ImmutableBytesPtr.copyBytesIfNecessary(ptr),
-                                                results);
+                                                
maintainer.createGetterFromKeyValues(
+                                                    
ImmutableBytesPtr.copyBytesIfNecessary(ptr),
+                                                    results);
                                         Put put = 
maintainer.buildUpdateMutation(kvBuilder,
                                             valueGetter, ptr, ts,
-                                            
c.getEnvironment().getRegion().getRegionInfo().getStartKey(),
-                                            
c.getEnvironment().getRegion().getRegionInfo().getEndKey());
+                                            
env.getRegion().getRegionInfo().getStartKey(),
+                                            
env.getRegion().getRegionInfo().getEndKey());
                                         indexMutations.add(put);
                                     }
                                 }
@@ -406,12 +415,12 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                                 // FIXME: the version of the Delete 
constructor without the lock
                                 // args was introduced in 0.94.4, thus if we 
try to use it here
                                 // we can no longer use the 0.94.2 version of 
the client.
-                              Cell firstKV = results.get(0);
-                              Delete delete = new Delete(firstKV.getRowArray(),
-                                  firstKV.getRowOffset(), 
firstKV.getRowLength(),ts);
-                              mutations.add(delete);
-                              // force tephra to ignore this deletes
-                              
delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+                                Cell firstKV = results.get(0);
+                                Delete delete = new 
Delete(firstKV.getRowArray(),
+                                    firstKV.getRowOffset(), 
firstKV.getRowLength(),ts);
+                                mutations.add(delete);
+                                // force tephra to ignore this deletes
+                                
delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
                             } else if (isUpsert) {
                                 Arrays.fill(values, null);
                                 int i = 0;
@@ -423,7 +432,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                                         // If SortOrder from expression in 
SELECT doesn't match the
                                         // column being projected into then 
invert the bits.
                                         if (expression.getSortOrder() !=
-                                            
projectedColumns.get(i).getSortOrder()) {
+                                                
projectedColumns.get(i).getSortOrder()) {
                                             SortOrder.invert(values[i], 0, 
values[i], 0,
                                                 values[i].length);
                                         }
@@ -436,20 +445,20 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                                     if (expression.evaluate(result, ptr)) {
                                         PColumn column = 
projectedColumns.get(i);
                                         Object value = expression.getDataType()
-                                            .toObject(ptr, 
column.getSortOrder());
+                                                .toObject(ptr, 
column.getSortOrder());
                                         // We are guaranteed that the two 
column will have the
                                         // same type.
                                         if 
(!column.getDataType().isSizeCompatible(ptr, value,
-                                                column.getDataType(), 
expression.getMaxLength(),
-                                                expression.getScale(), 
column.getMaxLength(),
-                                                column.getScale())) {
+                                            column.getDataType(), 
expression.getMaxLength(),
+                                            expression.getScale(), 
column.getMaxLength(),
+                                            column.getScale())) {
                                             throw new 
DataExceedsCapacityException(
                                                 column.getDataType(), 
column.getMaxLength(),
                                                 column.getScale());
                                         }
                                         column.getDataType().coerceBytes(ptr, 
value,
                                             expression.getDataType(), 
expression.getMaxLength(),
-                                            expression.getScale(), 
expression.getSortOrder(),
+                                            expression.getScale(), 
expression.getSortOrder(), 
                                             column.getMaxLength(), 
column.getScale(),
                                             column.getSortOrder(), 
projectedTable.rowKeyOrderOptimizable());
                                         byte[] bytes = 
ByteUtil.copyKeyBytesIfNecessary(ptr);
@@ -486,7 +495,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                                  * We insert one empty key value per row per 
timestamp.
                                  */
                                 Set<Long> timeStamps =
-                                    
Sets.newHashSetWithExpectedSize(results.size());
+                                        
Sets.newHashSetWithExpectedSize(results.size());
                                 for (Cell kv : results) {
                                     long kvts = kv.getTimestamp();
                                     if (!timeStamps.contains(kvts)) {
@@ -512,37 +521,25 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                         } catch (ConstraintViolationException e) {
                             // Log and ignore in count
                             logger.error(LogUtil.addCustomAnnotations("Failed 
to create row in " +
-                                region.getRegionInfo().getRegionNameAsString() 
+ " with values " +
-                                SchemaUtil.toString(values),
-                                ScanUtil.getCustomAnnotations(scan)), e);
+                                    
region.getRegionInfo().getRegionNameAsString() + " with values " +
+                                    SchemaUtil.toString(values),
+                                    ScanUtil.getCustomAnnotations(scan)), e);
                             continue;
                         }
                         aggregators.aggregate(rowAggregators, result);
                         hasAny = true;
                     }
                 } while (hasMore);
-                success = true;
             }
         } finally {
             try {
-                if (success && updateStats) {
-                    try {
-                        stats.updateStatistic(region);
-                    } finally {
-                        stats.close();
-                    }
-                }
+                innerScanner.close();
             } finally {
-                try {
-                    innerScanner.close();
-                } finally {
-                    region.closeRegionOperation();
-                }
+                region.closeRegionOperation();
             }
         }
-
         if (logger.isDebugEnabled()) {
-               logger.debug(LogUtil.addCustomAnnotations("Finished scanning " 
+ rowCount + " rows for ungrouped coprocessor scan " + scan, 
ScanUtil.getCustomAnnotations(scan)));
+            logger.debug(LogUtil.addCustomAnnotations("Finished scanning " + 
rowCount + " rows for ungrouped coprocessor scan " + scan, 
ScanUtil.getCustomAnnotations(scan)));
         }
 
         if (!mutations.isEmpty()) {
@@ -589,7 +586,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
 
             @Override
             public long getMaxResultSize() {
-               return scan.getMaxResultSize();
+                return scan.getMaxResultSize();
             }
 
             @Override
@@ -647,7 +644,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                         : StatisticsCollector.NO_TIMESTAMP;
                 StatisticsCollector stats = new 
StatisticsCollector(c.getEnvironment(), table.getNameAsString(),
                         clientTimeStamp, store.getFamily().getName());
-                internalScanner = 
stats.createCompactionScanner(c.getEnvironment().getRegion(), store, scanner,
+                internalScanner = 
stats.createCompactionScanner(c.getEnvironment(), store, scanner,
                         mergeRegions);
             } catch (IOException e) {
                 // If we can't reach the stats table, don't interrupt the 
normal
@@ -663,12 +660,30 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
 
     @Override
     public void postSplit(final ObserverContext<RegionCoprocessorEnvironment> 
e, final Region l,
-        final Region r) throws IOException {
-        final Region region = e.getEnvironment().getRegion();
+            final Region r) throws IOException {
+        final Configuration config = e.getEnvironment().getConfiguration();
+        boolean async = config.getBoolean(COMMIT_STATS_ASYNC, 
DEFAULT_COMMIT_STATS_ASYNC);
+        if (!async) {
+            splitStatsInternal(e, l, r);
+        } else {
+            StatisticsCollectionRunTracker.getInstance(config).runTask(new 
Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    splitStatsInternal(e, l, r);
+                    return null;
+                }
+            });
+        }
+    }
+
+    private void splitStatsInternal(final 
ObserverContext<RegionCoprocessorEnvironment> e,
+            final Region l, final Region r) {
+        final RegionCoprocessorEnvironment env = e.getEnvironment();
+        final Region region = env.getRegion();
         final TableName table = region.getRegionInfo().getTable();
         try {
             boolean useCurrentTime =
-                    
e.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+                    
env.getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
                             
QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
             // Provides a means of clients controlling their timestamps to not 
use current time
             // when background tasks are updating stats. Instead we track the 
max timestamp of
@@ -678,7 +693,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
             User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
               @Override
               public Void run() throws Exception {
-                StatisticsCollector stats = new 
StatisticsCollector(e.getEnvironment(),
+                StatisticsCollector stats = new StatisticsCollector(env,
                   table.getNameAsString(), clientTimeStamp);
                 try {
                   stats.splitStats(region, l, r);
@@ -695,6 +710,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
         }
     }
 
+
     private static PTable deserializeTable(byte[] b) {
         try {
             PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b);
@@ -703,6 +719,152 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
             throw new RuntimeException(e);
         }
     }
+    
+    private RegionScanner collectStats(final RegionScanner innerScanner, 
StatisticsCollector stats,
+            final Region region, final Scan scan, Configuration config) throws 
IOException {
+        StatsCollectionCallable callable =
+                new StatsCollectionCallable(stats, region, innerScanner, 
config);
+        byte[] asyncBytes = 
scan.getAttribute(BaseScannerRegionObserver.RUN_UPDATE_STATS_ASYNC);
+        boolean async = false;
+        if (asyncBytes != null) {
+            async = Bytes.toBoolean(asyncBytes);
+        }
+        long rowCount = 0; // in case of async, we report 0 as number of rows 
updated
+        StatisticsCollectionRunTracker statsRunTracker =
+                StatisticsCollectionRunTracker.getInstance(config);
+        boolean runUpdateStats = 
statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo());
+        if (runUpdateStats) {
+            if (!async) {
+                rowCount = callable.call();
+            } else {
+                statsRunTracker.runTask(callable);
+            }
+        } else {
+            rowCount = CONCURRENT_UPDATE_STATS_ROW_COUNT;
+            logger.info("UPDATE STATISTICS didn't run because another UPDATE 
STATISTICS command was already running on the region "
+                    + region.getRegionInfo().getRegionNameAsString());
+        }
+        byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
+        final KeyValue aggKeyValue =
+                KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, 
SINGLE_COLUMN_FAMILY,
+                    SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, 
rowCountBytes.length);
+        RegionScanner scanner = new BaseRegionScanner() {
+            @Override
+            public HRegionInfo getRegionInfo() {
+                return region.getRegionInfo();
+            }
+
+            @Override
+            public boolean isFilterDone() {
+                return true;
+            }
+
+            @Override
+            public void close() throws IOException {
+                // no-op because we want to manage closing of the inner 
scanner ourselves.
+            }
+
+            @Override
+            public boolean next(List<Cell> results) throws IOException {
+                results.add(aggKeyValue);
+                return false;
+            }
+
+            @Override
+            public long getMaxResultSize() {
+                return scan.getMaxResultSize();
+            }
+
+            @Override
+            public int getBatch() {
+                return innerScanner.getBatch();
+            }
+        };
+        return scanner;
+    }
+
+    /**
+     * 
+     * Callable to encapsulate the collection of stats triggered by 
+     * UPDATE STATISTICS command.
+     *
+     * Package private for tests.
+     */
+    static class StatsCollectionCallable implements Callable<Long> {
+        private final StatisticsCollector stats;
+        private final Region region;
+        private final RegionScanner innerScanner;
+        private final Configuration config;
+
+        StatsCollectionCallable(StatisticsCollector s, Region r, RegionScanner 
rs,
+                Configuration config) {
+            this.stats = s;
+            this.region = r;
+            this.innerScanner = rs;
+            this.config = config;
+        }
+
+        @Override
+        public Long call() throws IOException {
+            return collectStatsInternal();
+        }
+
+        private boolean areStatsBeingCollectedViaCompaction() {
+            return StatisticsCollectionRunTracker.getInstance(config)
+                    
.areStatsBeingCollectedOnCompaction(region.getRegionInfo());
+        }
+
+        private long collectStatsInternal() throws IOException {
+            long startTime = System.currentTimeMillis();
+            region.startRegionOperation();
+            boolean hasMore = false;
+            boolean noErrors = false;
+            boolean compactionRunning = areStatsBeingCollectedViaCompaction();
+            long rowCount = 0;
+            try {
+                if (!compactionRunning) {
+                    synchronized (innerScanner) {
+                        do {
+                            List<Cell> results = new ArrayList<Cell>();
+                            hasMore = innerScanner.nextRaw(results);
+                            stats.collectStatistics(results);
+                            rowCount++;
+                            compactionRunning = 
areStatsBeingCollectedViaCompaction();
+                        } while (hasMore && !compactionRunning);
+                        noErrors = true;
+                    }
+                }
+                return compactionRunning ? COMPACTION_UPDATE_STATS_ROW_COUNT : 
rowCount;
+            } catch (IOException e) {
+                logger.error("IOException in update stats: " + 
Throwables.getStackTraceAsString(e));
+                throw e;
+            } finally {
+                try {
+                    if (noErrors && !compactionRunning) {
+                        stats.updateStatistic(region);
+                        logger.info("UPDATE STATISTICS finished successfully 
for scanner: "
+                                + innerScanner + ". Number of rows scanned: " 
+ rowCount
+                                + ". Time: " + (System.currentTimeMillis() - 
startTime));
+                    }
+                    if (compactionRunning) {
+                        logger.info("UPDATE STATISTICS stopped in between 
because major compaction was running for region "
+                                + 
region.getRegionInfo().getRegionNameAsString());
+                    }
+                } finally {
+                    try {
+                        
StatisticsCollectionRunTracker.getInstance(config).removeUpdateStatsCommandRegion(region.getRegionInfo());
+                        stats.close();
+                    } finally {
+                        try {
+                            innerScanner.close();
+                        } finally {
+                            region.closeRegionOperation();
+                        }
+                    }
+                }
+            }
+        }
+    }
 
     private static List<Expression> deserializeExpressions(byte[] b) {
         ByteArrayInputStream stream = new ByteArrayInputStream(b);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 1b05334..84983e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -151,6 +151,9 @@ public interface QueryServices extends SQLCloseable {
     public static final String STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB = 
"phoenix.stats.guidepost.width";
     public static final String STATS_GUIDEPOST_PER_REGION_ATTRIB = 
"phoenix.stats.guidepost.per.region";
     public static final String STATS_USE_CURRENT_TIME_ATTRIB = 
"phoenix.stats.useCurrentTime";
+    public static final String RUN_UPDATE_STATS_ASYNC = 
"phoenix.update.stats.command.async";
+    public static final String STATS_SERVER_POOL_SIZE = 
"phoenix.stats.pool.size";
+    public static final String COMMIT_STATS_ASYNC = 
"phoenix.stats.commit.async";
 
     public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = 
"phoenix.sequence.saltBuckets";
     public static final String COPROCESSOR_PRIORITY_ATTRIB = 
"phoenix.coprocessor.priority";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 13a7beb..6401c01 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -22,6 +22,7 @@ import static 
org.apache.phoenix.query.QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE
 import static 
org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME;
 import static 
org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
+import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC;
 import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK;
@@ -54,6 +55,7 @@ import static 
org.apache.phoenix.query.QueryServices.REGIONSERVER_INFO_PORT_ATTR
 import static 
org.apache.phoenix.query.QueryServices.REGIONSERVER_LEASE_PERIOD_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.RPC_TIMEOUT_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.RUN_UPDATE_STATS_ASYNC;
 import static org.apache.phoenix.query.QueryServices.SCAN_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.SCAN_RESULT_CHUNK_SIZE;
 import static 
org.apache.phoenix.query.QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB;
@@ -172,6 +174,9 @@ public class QueryServicesOptions {
     // compression we're getting)
     public static final long DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES = 3* 100 * 
1024 *1024;
     public static final boolean DEFAULT_STATS_USE_CURRENT_TIME = true;
+    public static final boolean DEFAULT_RUN_UPDATE_STATS_ASYNC = true;
+    public static final boolean DEFAULT_COMMIT_STATS_ASYNC = true;
+    public static final int DEFAULT_STATS_POOL_SIZE = 4;
 
     public static final boolean DEFAULT_USE_REVERSE_SCAN = true;
 
@@ -247,6 +252,8 @@ public class QueryServicesOptions {
         Configuration config = 
HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
         QueryServicesOptions options = new QueryServicesOptions(config)
             .setIfUnset(STATS_USE_CURRENT_TIME_ATTRIB, 
DEFAULT_STATS_USE_CURRENT_TIME)
+            .setIfUnset(RUN_UPDATE_STATS_ASYNC, DEFAULT_RUN_UPDATE_STATS_ASYNC)
+            .setIfUnset(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC)
             .setIfUnset(KEEP_ALIVE_MS_ATTRIB, DEFAULT_KEEP_ALIVE_MS)
             .setIfUnset(THREAD_POOL_SIZE_ATTRIB, DEFAULT_THREAD_POOL_SIZE)
             .setIfUnset(QUEUE_SIZE_ATTRIB, DEFAULT_QUEUE_SIZE)
@@ -575,4 +582,14 @@ public class QueryServicesOptions {
         config.set(EXTRA_JDBC_ARGUMENTS_ATTRIB, extraArgs);
         return this;
     }
+
+    public QueryServicesOptions setRunUpdateStatsAsync(boolean flag) {
+        config.setBoolean(RUN_UPDATE_STATS_ASYNC, flag);
+        return this;
+    }
+
+    public QueryServicesOptions setCommitStatsAsync(boolean flag) {
+        config.setBoolean(COMMIT_STATS_ASYNC, flag);
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 9c365ab..35d61ff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -904,7 +904,13 @@ public class MetaDataClient {
                 }
             }
         }
-        return new MutationState((int)rowCount, connection);
+        final long count = rowCount;
+        return new MutationState(1, connection) {
+            @Override
+            public long getUpdateCount() {
+                return count;
+            }
+        };
     }
 
     private long updateStatisticsInternal(PName physicalName, PTable 
logicalTable, Map<String, Object> statsProps) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java
new file mode 100644
index 0000000..5f6be3f
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stats;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+
+/**
+ * Singleton that is used to track state associated with regions undergoing 
stats collection at the
+ * region server's JVM level.
+ */
+public class StatisticsCollectionRunTracker {
+    private static volatile StatisticsCollectionRunTracker INSTANCE;
+    private final Set<HRegionInfo> updateStatsRegions = Collections
+            .newSetFromMap(new ConcurrentHashMap<HRegionInfo, Boolean>());
+    private final Set<HRegionInfo> compactingRegions = Collections
+            .newSetFromMap(new ConcurrentHashMap<HRegionInfo, Boolean>());
+    private final ExecutorService executor;
+    
+    // Constants added for testing purposes
+    public static final long CONCURRENT_UPDATE_STATS_ROW_COUNT = -100l;
+    public static final long COMPACTION_UPDATE_STATS_ROW_COUNT = -200l;
+    
+    public static StatisticsCollectionRunTracker getInstance(Configuration 
config) {
+        StatisticsCollectionRunTracker result = INSTANCE;
+        if (result == null) {
+            synchronized (StatisticsCollectionRunTracker.class) {
+                result = INSTANCE;
+                if (result == null) {
+                    INSTANCE = result = new 
StatisticsCollectionRunTracker(config);
+                }
+            }
+        }
+        return result;
+    }
+
+    private StatisticsCollectionRunTracker(Configuration config) {
+        int poolSize =
+                config.getInt(QueryServices.STATS_SERVER_POOL_SIZE,
+                    QueryServicesOptions.DEFAULT_STATS_POOL_SIZE);
+        executor = Executors.newFixedThreadPool(poolSize, new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread t = Executors.defaultThreadFactory().newThread(r);
+                t.setDaemon(true);
+                return t;
+            }
+        });
+    }
+
+    /**
+     * @param regionInfo for the region that should be marked as undergoing 
stats collection via
+     *            major compaction.
+     * @return true if the region wasn't already marked for stats collection 
via compaction, false
+     *         otherwise.
+     */
+    public boolean addCompactingRegion(HRegionInfo regionInfo) {
+        return compactingRegions.add(regionInfo);
+    }
+
+    /**
+     * @param regionInfo for the region that should be unmarked as undergoing 
stats collection via
+     *            major compaction.
+     * @return true if the region was marked for stats collection via 
compaction, false otherwise.
+     */
+    public boolean removeCompactingRegion(HRegionInfo regionInfo) {
+        return compactingRegions.remove(regionInfo);
+    }
+
+    /**
+     * @param regionInfo for the region to check for.
+     * @return true if stats are being collected for the region via major 
compaction, false
+     *         otherwise.
+     */
+    public boolean areStatsBeingCollectedOnCompaction(HRegionInfo regionInfo) {
+        return compactingRegions.contains(regionInfo);
+    }
+
+    /**
+     * @param regionInfo for the region to run UPDATE STATISTICS command on.
+     * @return true if UPDATE STATISTICS wasn't already running on the region, 
false otherwise.
+     */
+    public boolean addUpdateStatsCommandRegion(HRegionInfo regionInfo) {
+        return updateStatsRegions.add(regionInfo);
+    }
+
+    /**
+     * @param regionInfo for the region to mark as not running UPDATE 
STATISTICS command on.
+     * @return true if UPDATE STATISTICS was running on the region, false 
otherwise.
+     */
+    public boolean removeUpdateStatsCommandRegion(HRegionInfo regionInfo) {
+        return updateStatsRegions.remove(regionInfo);
+    }
+
+    /**
+     * Enqueues the task for execution.
+     * @param <T>
+     * @param c task to execute
+     */
+    public <T> Future<T> runTask(Callable<T> c) {
+        return executor.submit(c);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
index 24e1507..41bdc8b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -216,14 +216,14 @@ public class StatisticsCollector {
         }
     }
 
-    public InternalScanner createCompactionScanner(Region region, Store store, 
InternalScanner s,
+    public InternalScanner 
createCompactionScanner(RegionCoprocessorEnvironment env, Store store, 
InternalScanner s,
             Pair<HRegionInfo, HRegionInfo> mergeRegions) throws IOException {
         // See if this is for Major compaction
         if (logger.isDebugEnabled()) {
             logger.debug("Compaction scanner created for stats");
         }
         ImmutableBytesPtr cfKey = new 
ImmutableBytesPtr(store.getFamily().getName());
-        return getInternalScanner(region, store, s, cfKey, mergeRegions);
+        return getInternalScanner(env, store, s, cfKey, mergeRegions);
     }
 
     public void splitStats(Region parent, Region left, Region right) {
@@ -245,9 +245,9 @@ public class StatisticsCollector {
         }
     }
 
-    protected InternalScanner getInternalScanner(Region region, Store store, 
InternalScanner internalScan,
+    protected InternalScanner getInternalScanner(RegionCoprocessorEnvironment 
env, Store store, InternalScanner internalScan,
             ImmutableBytesPtr family, Pair<HRegionInfo, HRegionInfo> 
mergeRegions) {
-        return new StatisticsScanner(this, statsTable, region, internalScan, 
family, mergeRegions);
+        return new StatisticsScanner(this, statsTable, env, internalScan, 
family, mergeRegions);
     }
 
     public void clear() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index 761b388..876d024 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -17,16 +17,22 @@
  */
 package org.apache.phoenix.schema.stats;
 
+import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_COMMIT_STATS_ASYNC;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -44,15 +50,18 @@ public class StatisticsScanner implements InternalScanner {
     private StatisticsCollector tracker;
     private ImmutableBytesPtr family;
     private Pair<HRegionInfo, HRegionInfo> mergeRegions;
+    private final Configuration config;
 
-    public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter 
stats, Region region,
+    public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter 
stats, RegionCoprocessorEnvironment env,
             InternalScanner delegate, ImmutableBytesPtr family, 
Pair<HRegionInfo, HRegionInfo> mergeRegions) {
         this.tracker = tracker;
         this.stats = stats;
         this.delegate = delegate;
-        this.region = region;
+        this.region = env.getRegion();
         this.family = family;
         this.mergeRegions = mergeRegions;
+        this.config = env.getConfiguration();
+        
StatisticsCollectionRunTracker.getInstance(config).addCompactingRegion(region.getRegionInfo());
     }
 
     @Override
@@ -83,62 +92,80 @@ public class StatisticsScanner implements InternalScanner {
 
     @Override
     public void close() throws IOException {
-        IOException toThrow = null;
-        try {
-            // update the statistics table
-            // Just verify if this if fine
-            ArrayList<Mutation> mutations = new ArrayList<Mutation>();
-            if (mergeRegions != null) {
-                if (mergeRegions.getFirst() != null) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Deleting stale stats for the region "
-                                + 
mergeRegions.getFirst().getRegionNameAsString() + " as part of major 
compaction");
+        boolean async = config.getBoolean(COMMIT_STATS_ASYNC, 
DEFAULT_COMMIT_STATS_ASYNC);
+        StatisticsCollectionRunTracker collectionTracker = 
StatisticsCollectionRunTracker.getInstance(config);
+        StatisticsScannerCallable callable = new StatisticsScannerCallable();
+        if (!async) {
+            callable.call();
+        } else {
+            collectionTracker.runTask(callable);
+        }
+    }
+    
+    private class StatisticsScannerCallable implements Callable<Void> {
+        @Override
+        public Void call() throws IOException {
+            IOException toThrow = null;
+            StatisticsCollectionRunTracker collectionTracker = 
StatisticsCollectionRunTracker.getInstance(config);
+            final HRegionInfo regionInfo = region.getRegionInfo();
+            try {
+                // update the statistics table
+                // Just verify if this if fine
+                ArrayList<Mutation> mutations = new ArrayList<Mutation>();
+                if (mergeRegions != null) {
+                    if (mergeRegions.getFirst() != null) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Deleting stale stats for the region "
+                                    + 
mergeRegions.getFirst().getRegionNameAsString() + " as part of major 
compaction");
+                        }
+                        
stats.deleteStats(mergeRegions.getFirst().getRegionName(), tracker, family, 
mutations);
                     }
-                    stats.deleteStats(mergeRegions.getFirst().getRegionName(), 
tracker, family, mutations);
-                }
-                if (mergeRegions.getSecond() != null) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Deleting stale stats for the region "
-                                + 
mergeRegions.getSecond().getRegionNameAsString() + " as part of major 
compaction");
+                    if (mergeRegions.getSecond() != null) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Deleting stale stats for the region "
+                                    + 
mergeRegions.getSecond().getRegionNameAsString() + " as part of major 
compaction");
+                        }
+                        
stats.deleteStats(mergeRegions.getSecond().getRegionName(), tracker, family, 
mutations);
                     }
-                    
stats.deleteStats(mergeRegions.getSecond().getRegionName(), tracker, family, 
mutations);
                 }
-            }
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Deleting the stats for the region " + 
region.getRegionInfo().getRegionNameAsString()
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Deleting the stats for the region " + 
regionInfo.getRegionNameAsString()
                         + " as part of major compaction");
-            }
-            stats.deleteStats(region.getRegionInfo().getRegionName(), 
this.tracker, family, mutations);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Adding new stats for the region " + 
region.getRegionInfo().getRegionNameAsString()
+                }
+                stats.deleteStats(regionInfo.getRegionName(), tracker, family, 
mutations);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Adding new stats for the region " + 
regionInfo.getRegionNameAsString()
                         + " as part of major compaction");
-            }
-            stats.addStats(region.getRegionInfo().getRegionName(), 
this.tracker, family, mutations);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Committing new stats for the region " + 
region.getRegionInfo().getRegionNameAsString()
+                }
+                stats.addStats(regionInfo.getRegionName(), tracker, family, 
mutations);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Committing new stats for the region " + 
regionInfo.getRegionNameAsString()
                         + " as part of major compaction");
-            }
-            stats.commitStats(mutations);
-        } catch (IOException e) {
-            LOG.error("Failed to update statistics table!", e);
-            toThrow = e;
-        } finally {
-            try {
-                stats.close();
+                }
+                stats.commitStats(mutations);
             } catch (IOException e) {
-                if (toThrow == null) toThrow = e;
-                LOG.error("Error while closing the stats table", e);
+                LOG.error("Failed to update statistics table!", e);
+                toThrow = e;
             } finally {
-                // close the delegate scanner
                 try {
-                    delegate.close();
+                    collectionTracker.removeCompactingRegion(regionInfo);
+                    stats.close();
                 } catch (IOException e) {
                     if (toThrow == null) toThrow = e;
-                    LOG.error("Error while closing the scanner", e);
+                    LOG.error("Error while closing the stats table", e);
                 } finally {
-                    if (toThrow != null) { throw toThrow; }
+                    // close the delegate scanner
+                    try {
+                        delegate.close();
+                    } catch (IOException e) {
+                        if (toThrow == null) toThrow = e;
+                        LOG.error("Error while closing the scanner", e);
+                    } finally {
+                        if (toThrow != null) { throw toThrow; }
+                    }
                 }
             }
+            return null;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
index 834675c..22d1ab7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
@@ -24,11 +24,8 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index 5289ab9..215110c 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -55,6 +55,8 @@ public final class QueryServicesTestImpl extends 
BaseQueryServicesImpl {
     public static final int DEFAULT_MIN_STATS_UPDATE_FREQ_MS = 0;
     public static final boolean DEFAULT_EXPLAIN_CHUNK_COUNT = false; // TODO: 
update explain plans in test and set to true
     public static final String DEFAULT_EXTRA_JDBC_ARGUMENTS = 
PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
+    private static final boolean DEFAULT_RUN_UPDATE_STATS_ASYNC = false;
+    private static final boolean DEFAULT_COMMIT_STATS_ASYNC = false;
 
     
     /**
@@ -93,7 +95,9 @@ public final class QueryServicesTestImpl extends 
BaseQueryServicesImpl {
                 
.setMaxClientMetaDataCacheSize(DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE)
                 
.setMaxServerMetaDataCacheSize(DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE)
                 .setForceRowKeyOrder(DEFAULT_FORCE_ROWKEY_ORDER)
-                .setExtraJDBCArguments(DEFAULT_EXTRA_JDBC_ARGUMENTS);
+                .setExtraJDBCArguments(DEFAULT_EXTRA_JDBC_ARGUMENTS)
+                .setRunUpdateStatsAsync(DEFAULT_RUN_UPDATE_STATS_ASYNC)
+                .setCommitStatsAsync(DEFAULT_COMMIT_STATS_ASYNC);
     }
     
     public QueryServicesTestImpl(ReadOnlyProps defaultProps, ReadOnlyProps 
overrideProps) {

Reply via email to