Repository: phoenix Updated Branches: refs/heads/master 1c3e9495d -> 05ff5618d
PHOENIX-2430 Update stats table asynchronously Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/05ff5618 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/05ff5618 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/05ff5618 Branch: refs/heads/master Commit: 05ff5618d4e630ed5e5c8160a3dc4628c0b7f5d6 Parents: 1c3e949 Author: Samarth <samarth.j...@salesforce.com> Authored: Tue Dec 22 12:08:40 2015 -0800 Committer: Samarth <samarth.j...@salesforce.com> Committed: Tue Dec 22 12:08:40 2015 -0800 ---------------------------------------------------------------------- .../StatisticsCollectionRunTrackerIT.java | 170 ++++++++++ .../org/apache/phoenix/cache/GlobalCache.java | 6 +- .../coprocessor/BaseScannerRegionObserver.java | 3 +- .../UngroupedAggregateRegionObserver.java | 324 ++++++++++++++----- .../org/apache/phoenix/query/QueryServices.java | 3 + .../phoenix/query/QueryServicesOptions.java | 17 + .../apache/phoenix/schema/MetaDataClient.java | 8 +- .../stats/StatisticsCollectionRunTracker.java | 130 ++++++++ .../schema/stats/StatisticsCollector.java | 8 +- .../phoenix/schema/stats/StatisticsScanner.java | 115 ++++--- .../phoenix/schema/stats/StatisticsWriter.java | 3 - .../phoenix/query/QueryServicesTestImpl.java | 6 +- 12 files changed, 654 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java new file mode 100644 index 0000000..bf567f0 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT; +import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CONCURRENT_UPDATE_STATS_ROW_COUNT; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class StatisticsCollectionRunTrackerIT extends BaseOwnClusterHBaseManagedTimeIT { + private static final StatisticsCollectionRunTracker tracker = StatisticsCollectionRunTracker + .getInstance(new Configuration()); + @BeforeClass + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + // Must update config before starting server + props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20)); + props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1024)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testStateBeforeAndAfterUpdateStatsCommand() throws Exception { + String tableName = "testStateBeforeAndAfterUpdateStatsCommand".toUpperCase(); + HRegionInfo regionInfo = createTableAndGetRegion(tableName); + StatisticsCollectionRunTracker tracker = + StatisticsCollectionRunTracker.getInstance(new Configuration()); + // assert that the region wasn't added to the tracker + assertTrue(tracker.addUpdateStatsCommandRegion(regionInfo)); + // assert that removing the region from the tracker works + assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo)); + runUpdateStats(tableName); + // assert that after update stats is complete, tracker isn't tracking the region any more + assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo)); + } + + @Test + public void testStateBeforeAndAfterMajorCompaction() throws Exception { + String tableName = "testStateBeforeAndAfterMajorCompaction".toUpperCase(); + HRegionInfo regionInfo = createTableAndGetRegion(tableName); + StatisticsCollectionRunTracker tracker = + StatisticsCollectionRunTracker.getInstance(new Configuration()); + // Upsert values in the table. + String keyPrefix = "aaaaaaaaaaaaaaaaaaaa"; + String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)"; + try (Connection conn = DriverManager.getConnection(getUrl())) { + PreparedStatement stmt = conn.prepareStatement(upsert); + for (int i = 0; i < 1000; i++) { + stmt.setString(1, keyPrefix + i); + stmt.setString(2, "KV" + i); + stmt.executeUpdate(); + } + conn.commit(); + } + // assert that the region wasn't added to the tracker + assertTrue(tracker.addCompactingRegion(regionInfo)); + // assert that removing the region from the tracker works + assertTrue(tracker.removeCompactingRegion(regionInfo)); + runMajorCompaction(tableName); + + // assert that after major compaction is complete, tracker isn't tracking the region any more + assertFalse(tracker.removeCompactingRegion(regionInfo)); + } + + @Test + public void testMajorCompactionPreventsUpdateStatsFromRunning() throws Exception { + String tableName = "testMajorCompactionPreventsUpdateStatsFromRunning".toUpperCase(); + HRegionInfo regionInfo = createTableAndGetRegion(tableName); + // simulate stats collection via major compaction by marking the region as compacting in the tracker + markRegionAsCompacting(regionInfo); + Assert.assertEquals("Row count didn't match", COMPACTION_UPDATE_STATS_ROW_COUNT, runUpdateStats(tableName)); + StatisticsCollectionRunTracker tracker = + StatisticsCollectionRunTracker.getInstance(new Configuration()); + // assert that the tracker state was cleared. + assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo)); + } + + @Test + public void testUpdateStatsPreventsAnotherUpdateStatsFromRunning() throws Exception { + String tableName = "testUpdateStatsPreventsAnotherUpdateStatsFromRunning".toUpperCase(); + HRegionInfo regionInfo = createTableAndGetRegion(tableName); + markRunningUpdateStats(regionInfo); + Assert.assertEquals("Row count didn't match", CONCURRENT_UPDATE_STATS_ROW_COUNT, + runUpdateStats(tableName)); + + // assert that running the concurrent and race-losing update stats didn't clear the region + // from the tracker. If the method returned true it means the tracker was still tracking + // the region. Slightly counter-intuitive, yes. + assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo)); + } + + private void markRegionAsCompacting(HRegionInfo regionInfo) { + StatisticsCollectionRunTracker tracker = + StatisticsCollectionRunTracker.getInstance(new Configuration()); + tracker.addCompactingRegion(regionInfo); + } + + private void markRunningUpdateStats(HRegionInfo regionInfo) { + StatisticsCollectionRunTracker tracker = + StatisticsCollectionRunTracker.getInstance(new Configuration()); + tracker.addUpdateStatsCommandRegion(regionInfo); + } + + private HRegionInfo createTableAndGetRegion(String tableName) throws Exception { + byte[] tableNameBytes = Bytes.toBytes(tableName); + String ddl = "CREATE TABLE " + tableName + " (PK1 VARCHAR PRIMARY KEY, KV1 VARCHAR)"; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute(ddl); + PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class); + try (HBaseAdmin admin = phxConn.getQueryServices().getAdmin()) { + List<HRegionInfo> tableRegions = admin.getTableRegions(tableNameBytes); + return tableRegions.get(0); + } + } + } + + private long runUpdateStats(String tableName) throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + return conn.createStatement().executeUpdate("UPDATE STATISTICS " + tableName); + } + } + + private void runMajorCompaction(String tableName) throws Exception { + try (PhoenixConnection conn = DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class)) { + try (HBaseAdmin admin = conn.getQueryServices().getAdmin()) { + TableName t = TableName.valueOf(tableName); + admin.flush(t); + admin.majorCompact(t); + Thread.sleep(10000); + } + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java index 643112d..0a0de89 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java @@ -32,11 +32,9 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.memory.ChildMemoryManager; import org.apache.phoenix.memory.GlobalMemoryManager; -import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PMetaDataEntity; -import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.SizedUtil; import com.google.common.cache.Cache; @@ -53,13 +51,13 @@ import com.google.common.cache.Weigher; * @since 0.1 */ public class GlobalCache extends TenantCacheImpl { - private static GlobalCache INSTANCE; + private static volatile GlobalCache INSTANCE; private final Configuration config; // TODO: Use Guava cache with auto removal after lack of access private final ConcurrentMap<ImmutableBytesWritable,TenantCache> perTenantCacheMap = new ConcurrentHashMap<ImmutableBytesWritable,TenantCache>(); // Cache for lastest PTable for a given Phoenix table - private Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache; + private volatile Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache; public void clearTenantCache() { perTenantCacheMap.clear(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index d720806..236c2dc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -93,6 +93,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String GUIDEPOST_PER_REGION = "_GUIDEPOST_PER_REGION"; public static final String UPGRADE_DESC_ROW_KEY = "_UPGRADE_DESC_ROW_KEY"; public static final String SCAN_REGION_SERVER = "_SCAN_REGION_SERVER"; + public static final String RUN_UPDATE_STATS_ASYNC = "_RunUpdateStatsAsync"; /** * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations @@ -170,7 +171,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { if (!isRegionObserverFor(scan)) { return s; } - boolean success =false; + boolean success = false; // Save the current span. When done with the child span, reset the span back to // what it was. Otherwise, this causes the thread local storing the current span // to not be reset back to null causing catastrophic infinite loops http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index bd21e25..2bd9a5e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -21,7 +21,11 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY; +import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_COMMIT_STATS_ASYNC; +import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT; +import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CONCURRENT_UPDATE_STATS_ROW_COUNT; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -35,7 +39,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; +import co.cask.tephra.TxConstants; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; @@ -88,6 +96,7 @@ import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.ValueSchema.Field; +import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker; import org.apache.phoenix.schema.stats.StatisticsCollector; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; import org.apache.phoenix.schema.types.PBinary; @@ -95,6 +104,7 @@ import org.apache.phoenix.schema.types.PChar; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PDouble; import org.apache.phoenix.schema.types.PFloat; +import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; @@ -108,8 +118,7 @@ import org.apache.phoenix.util.TimeKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import co.cask.tephra.TxConstants; - +import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -120,7 +129,7 @@ import com.google.common.collect.Sets; * * @since 0.1 */ -public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ +public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver { // TODO: move all constants into a single class public static final String UNGROUPED_AGG = "UngroupedAgg"; public static final String DELETE_AGG = "DeleteAgg"; @@ -173,14 +182,19 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ @Override protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException { - Region region = c.getEnvironment().getRegion(); + RegionCoprocessorEnvironment env = c.getEnvironment(); + Region region = env.getRegion(); long ts = scan.getTimeRange().getMax(); - StatisticsCollector stats = null; - if(ScanUtil.isAnalyzeTable(scan)) { - byte[] gp_width_bytes = scan.getAttribute(BaseScannerRegionObserver.GUIDEPOST_WIDTH_BYTES); - byte[] gp_per_region_bytes = scan.getAttribute(BaseScannerRegionObserver.GUIDEPOST_PER_REGION); + if (ScanUtil.isAnalyzeTable(scan)) { + byte[] gp_width_bytes = + scan.getAttribute(BaseScannerRegionObserver.GUIDEPOST_WIDTH_BYTES); + byte[] gp_per_region_bytes = + scan.getAttribute(BaseScannerRegionObserver.GUIDEPOST_PER_REGION); // Let this throw, as this scan is being done for the sole purpose of collecting stats - stats = new StatisticsCollector(c.getEnvironment(), region.getRegionInfo().getTable().getNameAsString(), ts, gp_width_bytes, gp_per_region_bytes); + StatisticsCollector statsCollector = + new StatisticsCollector(env, region.getRegionInfo().getTable() + .getNameAsString(), ts, gp_width_bytes, gp_per_region_bytes); + return collectStats(s, statsCollector, region, scan, env.getConfiguration()); } int offsetToBe = 0; if (ScanUtil.isLocalIndex(scan)) { @@ -212,9 +226,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD); List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes); List<Mutation> indexMutations = localIndexBytes == null ? Collections.<Mutation>emptyList() : Lists.<Mutation>newArrayListWithExpectedSize(1024); - + RegionScanner theScanner = s; - + byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID); List<Expression> selectExpressions = null; byte[] upsertSelectTable = scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE); @@ -248,41 +262,39 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ if ((localIndexScan && !isDelete && !isDescRowKeyOrderUpgrade) || (j == null && p != null)) { if (dataColumns != null) { tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); - dataRegion = IndexUtil.getDataRegion(c.getEnvironment()); + dataRegion = IndexUtil.getDataRegion(env); viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); } ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); theScanner = - getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector, + getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector, dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr); - } - + } + if (j != null) { - theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), c.getEnvironment()); + theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), env); } - + int batchSize = 0; List<Mutation> mutations = Collections.emptyList(); boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan; if (isDescRowKeyOrderUpgrade || isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) { // TODO: size better mutations = Lists.newArrayListWithExpectedSize(1024); - batchSize = c.getEnvironment().getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); + batchSize = env.getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); } Aggregators aggregators = ServerAggregators.deserialize( - scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), c.getEnvironment().getConfiguration()); + scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), env.getConfiguration()); Aggregator[] rowAggregators = aggregators.getAggregators(); boolean hasMore; boolean hasAny = false; MultiKeyValueTuple result = new MultiKeyValueTuple(); if (logger.isDebugEnabled()) { - logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan))); + logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan))); } long rowCount = 0; final RegionScanner innerScanner = theScanner; region.startRegionOperation(); - boolean updateStats = stats != null; - boolean success = false; try { synchronized (innerScanner) { do { @@ -291,9 +303,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ // since this is an indication of whether or not there are more values after the // ones returned hasMore = innerScanner.nextRaw(results); - if (updateStats) { - stats.collectStatistics(results); - } if (!results.isEmpty()) { rowCount++; result.setKeyValues(results); @@ -313,8 +322,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ // Special case for re-writing DESC ARRAY, as the actual byte value needs to change in this case if (field.getDataType().isArrayType()) { field.getDataType().coerceBytes(ptr, null, field.getDataType(), - field.getMaxLength(), field.getScale(), field.getSortOrder(), - field.getMaxLength(), field.getScale(), field.getSortOrder(), true); // force to use correct separator byte + field.getMaxLength(), field.getScale(), field.getSortOrder(), + field.getMaxLength(), field.getScale(), field.getSortOrder(), true); // force to use correct separator byte } // Special case for re-writing DESC CHAR or DESC BINARY, to force the re-writing of trailing space characters else if (field.getDataType() == PChar.INSTANCE || field.getDataType() == PBinary.INSTANCE) { @@ -323,7 +332,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ len--; } ptr.set(ptr.get(), ptr.getOffset(), len); - // Special case for re-writing DESC FLOAT and DOUBLE, as they're not inverted like they should be (PHOENIX-2171) + // Special case for re-writing DESC FLOAT and DOUBLE, as they're not inverted like they should be (PHOENIX-2171) } else if (field.getDataType() == PFloat.INSTANCE || field.getDataType() == PDouble.INSTANCE) { byte[] invertedBytes = SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength()); ptr.set(invertedBytes); @@ -342,8 +351,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } writeToTable.newKey(ptr, values); if (Bytes.compareTo( - firstKV.getRowArray(), firstKV.getRowOffset() + offset, firstKV.getRowLength(), - ptr.get(),ptr.getOffset() + offset,ptr.getLength()) == 0) { + firstKV.getRowArray(), firstKV.getRowOffset() + offset, firstKV.getRowLength(), + ptr.get(),ptr.getOffset() + offset,ptr.getLength()) == 0) { continue; } byte[] newRow = ByteUtil.copyKeyBytesIfNecessary(ptr); @@ -357,21 +366,21 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ for (Cell cell : results) { // Copy existing cell but with new row key Cell newCell = new KeyValue(newRow, 0, newRow.length, - cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), - cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), - cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()), - cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), + cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), + cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()), + cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); switch (KeyValue.Type.codeToType(cell.getTypeByte())) { case Put: // If Put, point delete old Put Delete del = new Delete(oldRow); del.addDeleteMarker(new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), - cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), - cell.getQualifierArray(), cell.getQualifierOffset(), - cell.getQualifierLength(), cell.getTimestamp(), KeyValue.Type.Delete, - ByteUtil.EMPTY_BYTE_ARRAY, 0, 0)); + cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), + cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength(), cell.getTimestamp(), KeyValue.Type.Delete, + ByteUtil.EMPTY_BYTE_ARRAY, 0, 0)); mutations.add(del); - + Put put = new Put(newRow); put.add(newCell); mutations.add(put); @@ -391,13 +400,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ if (!results.isEmpty()) { result.getKey(ptr); ValueGetter valueGetter = - maintainer.createGetterFromKeyValues( - ImmutableBytesPtr.copyBytesIfNecessary(ptr), - results); + maintainer.createGetterFromKeyValues( + ImmutableBytesPtr.copyBytesIfNecessary(ptr), + results); Put put = maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, ts, - c.getEnvironment().getRegion().getRegionInfo().getStartKey(), - c.getEnvironment().getRegion().getRegionInfo().getEndKey()); + env.getRegion().getRegionInfo().getStartKey(), + env.getRegion().getRegionInfo().getEndKey()); indexMutations.add(put); } } @@ -406,12 +415,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ // FIXME: the version of the Delete constructor without the lock // args was introduced in 0.94.4, thus if we try to use it here // we can no longer use the 0.94.2 version of the client. - Cell firstKV = results.get(0); - Delete delete = new Delete(firstKV.getRowArray(), - firstKV.getRowOffset(), firstKV.getRowLength(),ts); - mutations.add(delete); - // force tephra to ignore this deletes - delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + Cell firstKV = results.get(0); + Delete delete = new Delete(firstKV.getRowArray(), + firstKV.getRowOffset(), firstKV.getRowLength(),ts); + mutations.add(delete); + // force tephra to ignore this deletes + delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); } else if (isUpsert) { Arrays.fill(values, null); int i = 0; @@ -423,7 +432,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ // If SortOrder from expression in SELECT doesn't match the // column being projected into then invert the bits. if (expression.getSortOrder() != - projectedColumns.get(i).getSortOrder()) { + projectedColumns.get(i).getSortOrder()) { SortOrder.invert(values[i], 0, values[i], 0, values[i].length); } @@ -436,20 +445,20 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ if (expression.evaluate(result, ptr)) { PColumn column = projectedColumns.get(i); Object value = expression.getDataType() - .toObject(ptr, column.getSortOrder()); + .toObject(ptr, column.getSortOrder()); // We are guaranteed that the two column will have the // same type. if (!column.getDataType().isSizeCompatible(ptr, value, - column.getDataType(), expression.getMaxLength(), - expression.getScale(), column.getMaxLength(), - column.getScale())) { + column.getDataType(), expression.getMaxLength(), + expression.getScale(), column.getMaxLength(), + column.getScale())) { throw new DataExceedsCapacityException( column.getDataType(), column.getMaxLength(), column.getScale()); } column.getDataType().coerceBytes(ptr, value, expression.getDataType(), expression.getMaxLength(), - expression.getScale(), expression.getSortOrder(), + expression.getScale(), expression.getSortOrder(), column.getMaxLength(), column.getScale(), column.getSortOrder(), projectedTable.rowKeyOrderOptimizable()); byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr); @@ -486,7 +495,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ * We insert one empty key value per row per timestamp. */ Set<Long> timeStamps = - Sets.newHashSetWithExpectedSize(results.size()); + Sets.newHashSetWithExpectedSize(results.size()); for (Cell kv : results) { long kvts = kv.getTimestamp(); if (!timeStamps.contains(kvts)) { @@ -512,37 +521,25 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } catch (ConstraintViolationException e) { // Log and ignore in count logger.error(LogUtil.addCustomAnnotations("Failed to create row in " + - region.getRegionInfo().getRegionNameAsString() + " with values " + - SchemaUtil.toString(values), - ScanUtil.getCustomAnnotations(scan)), e); + region.getRegionInfo().getRegionNameAsString() + " with values " + + SchemaUtil.toString(values), + ScanUtil.getCustomAnnotations(scan)), e); continue; } aggregators.aggregate(rowAggregators, result); hasAny = true; } } while (hasMore); - success = true; } } finally { try { - if (success && updateStats) { - try { - stats.updateStatistic(region); - } finally { - stats.close(); - } - } + innerScanner.close(); } finally { - try { - innerScanner.close(); - } finally { - region.closeRegionOperation(); - } + region.closeRegionOperation(); } } - if (logger.isDebugEnabled()) { - logger.debug(LogUtil.addCustomAnnotations("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan, ScanUtil.getCustomAnnotations(scan))); + logger.debug(LogUtil.addCustomAnnotations("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan, ScanUtil.getCustomAnnotations(scan))); } if (!mutations.isEmpty()) { @@ -589,7 +586,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ @Override public long getMaxResultSize() { - return scan.getMaxResultSize(); + return scan.getMaxResultSize(); } @Override @@ -647,7 +644,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ : StatisticsCollector.NO_TIMESTAMP; StatisticsCollector stats = new StatisticsCollector(c.getEnvironment(), table.getNameAsString(), clientTimeStamp, store.getFamily().getName()); - internalScanner = stats.createCompactionScanner(c.getEnvironment().getRegion(), store, scanner, + internalScanner = stats.createCompactionScanner(c.getEnvironment(), store, scanner, mergeRegions); } catch (IOException e) { // If we can't reach the stats table, don't interrupt the normal @@ -663,12 +660,30 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ @Override public void postSplit(final ObserverContext<RegionCoprocessorEnvironment> e, final Region l, - final Region r) throws IOException { - final Region region = e.getEnvironment().getRegion(); + final Region r) throws IOException { + final Configuration config = e.getEnvironment().getConfiguration(); + boolean async = config.getBoolean(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC); + if (!async) { + splitStatsInternal(e, l, r); + } else { + StatisticsCollectionRunTracker.getInstance(config).runTask(new Callable<Void>() { + @Override + public Void call() throws Exception { + splitStatsInternal(e, l, r); + return null; + } + }); + } + } + + private void splitStatsInternal(final ObserverContext<RegionCoprocessorEnvironment> e, + final Region l, final Region r) { + final RegionCoprocessorEnvironment env = e.getEnvironment(); + final Region region = env.getRegion(); final TableName table = region.getRegionInfo().getTable(); try { boolean useCurrentTime = - e.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, + env.getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME); // Provides a means of clients controlling their timestamps to not use current time // when background tasks are updating stats. Instead we track the max timestamp of @@ -678,7 +693,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { - StatisticsCollector stats = new StatisticsCollector(e.getEnvironment(), + StatisticsCollector stats = new StatisticsCollector(env, table.getNameAsString(), clientTimeStamp); try { stats.splitStats(region, l, r); @@ -695,6 +710,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } } + private static PTable deserializeTable(byte[] b) { try { PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b); @@ -703,6 +719,152 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ throw new RuntimeException(e); } } + + private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats, + final Region region, final Scan scan, Configuration config) throws IOException { + StatsCollectionCallable callable = + new StatsCollectionCallable(stats, region, innerScanner, config); + byte[] asyncBytes = scan.getAttribute(BaseScannerRegionObserver.RUN_UPDATE_STATS_ASYNC); + boolean async = false; + if (asyncBytes != null) { + async = Bytes.toBoolean(asyncBytes); + } + long rowCount = 0; // in case of async, we report 0 as number of rows updated + StatisticsCollectionRunTracker statsRunTracker = + StatisticsCollectionRunTracker.getInstance(config); + boolean runUpdateStats = statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo()); + if (runUpdateStats) { + if (!async) { + rowCount = callable.call(); + } else { + statsRunTracker.runTask(callable); + } + } else { + rowCount = CONCURRENT_UPDATE_STATS_ROW_COUNT; + logger.info("UPDATE STATISTICS didn't run because another UPDATE STATISTICS command was already running on the region " + + region.getRegionInfo().getRegionNameAsString()); + } + byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount)); + final KeyValue aggKeyValue = + KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, + SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length); + RegionScanner scanner = new BaseRegionScanner() { + @Override + public HRegionInfo getRegionInfo() { + return region.getRegionInfo(); + } + + @Override + public boolean isFilterDone() { + return true; + } + + @Override + public void close() throws IOException { + // no-op because we want to manage closing of the inner scanner ourselves. + } + + @Override + public boolean next(List<Cell> results) throws IOException { + results.add(aggKeyValue); + return false; + } + + @Override + public long getMaxResultSize() { + return scan.getMaxResultSize(); + } + + @Override + public int getBatch() { + return innerScanner.getBatch(); + } + }; + return scanner; + } + + /** + * + * Callable to encapsulate the collection of stats triggered by + * UPDATE STATISTICS command. + * + * Package private for tests. + */ + static class StatsCollectionCallable implements Callable<Long> { + private final StatisticsCollector stats; + private final Region region; + private final RegionScanner innerScanner; + private final Configuration config; + + StatsCollectionCallable(StatisticsCollector s, Region r, RegionScanner rs, + Configuration config) { + this.stats = s; + this.region = r; + this.innerScanner = rs; + this.config = config; + } + + @Override + public Long call() throws IOException { + return collectStatsInternal(); + } + + private boolean areStatsBeingCollectedViaCompaction() { + return StatisticsCollectionRunTracker.getInstance(config) + .areStatsBeingCollectedOnCompaction(region.getRegionInfo()); + } + + private long collectStatsInternal() throws IOException { + long startTime = System.currentTimeMillis(); + region.startRegionOperation(); + boolean hasMore = false; + boolean noErrors = false; + boolean compactionRunning = areStatsBeingCollectedViaCompaction(); + long rowCount = 0; + try { + if (!compactionRunning) { + synchronized (innerScanner) { + do { + List<Cell> results = new ArrayList<Cell>(); + hasMore = innerScanner.nextRaw(results); + stats.collectStatistics(results); + rowCount++; + compactionRunning = areStatsBeingCollectedViaCompaction(); + } while (hasMore && !compactionRunning); + noErrors = true; + } + } + return compactionRunning ? COMPACTION_UPDATE_STATS_ROW_COUNT : rowCount; + } catch (IOException e) { + logger.error("IOException in update stats: " + Throwables.getStackTraceAsString(e)); + throw e; + } finally { + try { + if (noErrors && !compactionRunning) { + stats.updateStatistic(region); + logger.info("UPDATE STATISTICS finished successfully for scanner: " + + innerScanner + ". Number of rows scanned: " + rowCount + + ". Time: " + (System.currentTimeMillis() - startTime)); + } + if (compactionRunning) { + logger.info("UPDATE STATISTICS stopped in between because major compaction was running for region " + + region.getRegionInfo().getRegionNameAsString()); + } + } finally { + try { + StatisticsCollectionRunTracker.getInstance(config).removeUpdateStatsCommandRegion(region.getRegionInfo()); + stats.close(); + } finally { + try { + innerScanner.close(); + } finally { + region.closeRegionOperation(); + } + } + } + } + } + } private static List<Expression> deserializeExpressions(byte[] b) { ByteArrayInputStream stream = new ByteArrayInputStream(b); http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 1b05334..84983e4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -151,6 +151,9 @@ public interface QueryServices extends SQLCloseable { public static final String STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB = "phoenix.stats.guidepost.width"; public static final String STATS_GUIDEPOST_PER_REGION_ATTRIB = "phoenix.stats.guidepost.per.region"; public static final String STATS_USE_CURRENT_TIME_ATTRIB = "phoenix.stats.useCurrentTime"; + public static final String RUN_UPDATE_STATS_ASYNC = "phoenix.update.stats.command.async"; + public static final String STATS_SERVER_POOL_SIZE = "phoenix.stats.pool.size"; + public static final String COMMIT_STATS_ASYNC = "phoenix.stats.commit.async"; public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = "phoenix.sequence.saltBuckets"; public static final String COPROCESSOR_PRIORITY_ATTRIB = "phoenix.coprocessor.priority"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 13a7beb..6401c01 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -22,6 +22,7 @@ import static org.apache.phoenix.query.QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME; import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB; import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS; +import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC; import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB; import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB; import static org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK; @@ -54,6 +55,7 @@ import static org.apache.phoenix.query.QueryServices.REGIONSERVER_INFO_PORT_ATTR import static org.apache.phoenix.query.QueryServices.REGIONSERVER_LEASE_PERIOD_ATTRIB; import static org.apache.phoenix.query.QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB; import static org.apache.phoenix.query.QueryServices.RPC_TIMEOUT_ATTRIB; +import static org.apache.phoenix.query.QueryServices.RUN_UPDATE_STATS_ASYNC; import static org.apache.phoenix.query.QueryServices.SCAN_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.SCAN_RESULT_CHUNK_SIZE; import static org.apache.phoenix.query.QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB; @@ -172,6 +174,9 @@ public class QueryServicesOptions { // compression we're getting) public static final long DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES = 3* 100 * 1024 *1024; public static final boolean DEFAULT_STATS_USE_CURRENT_TIME = true; + public static final boolean DEFAULT_RUN_UPDATE_STATS_ASYNC = true; + public static final boolean DEFAULT_COMMIT_STATS_ASYNC = true; + public static final int DEFAULT_STATS_POOL_SIZE = 4; public static final boolean DEFAULT_USE_REVERSE_SCAN = true; @@ -247,6 +252,8 @@ public class QueryServicesOptions { Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); QueryServicesOptions options = new QueryServicesOptions(config) .setIfUnset(STATS_USE_CURRENT_TIME_ATTRIB, DEFAULT_STATS_USE_CURRENT_TIME) + .setIfUnset(RUN_UPDATE_STATS_ASYNC, DEFAULT_RUN_UPDATE_STATS_ASYNC) + .setIfUnset(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC) .setIfUnset(KEEP_ALIVE_MS_ATTRIB, DEFAULT_KEEP_ALIVE_MS) .setIfUnset(THREAD_POOL_SIZE_ATTRIB, DEFAULT_THREAD_POOL_SIZE) .setIfUnset(QUEUE_SIZE_ATTRIB, DEFAULT_QUEUE_SIZE) @@ -575,4 +582,14 @@ public class QueryServicesOptions { config.set(EXTRA_JDBC_ARGUMENTS_ATTRIB, extraArgs); return this; } + + public QueryServicesOptions setRunUpdateStatsAsync(boolean flag) { + config.setBoolean(RUN_UPDATE_STATS_ASYNC, flag); + return this; + } + + public QueryServicesOptions setCommitStatsAsync(boolean flag) { + config.setBoolean(COMMIT_STATS_ASYNC, flag); + return this; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 9c365ab..35d61ff 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -904,7 +904,13 @@ public class MetaDataClient { } } } - return new MutationState((int)rowCount, connection); + final long count = rowCount; + return new MutationState(1, connection) { + @Override + public long getUpdateCount() { + return count; + } + }; } private long updateStatisticsInternal(PName physicalName, PTable logicalTable, Map<String, Object> statsProps) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java new file mode 100644 index 0000000..5f6be3f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stats; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; + +/** + * Singleton that is used to track state associated with regions undergoing stats collection at the + * region server's JVM level. + */ +public class StatisticsCollectionRunTracker { + private static volatile StatisticsCollectionRunTracker INSTANCE; + private final Set<HRegionInfo> updateStatsRegions = Collections + .newSetFromMap(new ConcurrentHashMap<HRegionInfo, Boolean>()); + private final Set<HRegionInfo> compactingRegions = Collections + .newSetFromMap(new ConcurrentHashMap<HRegionInfo, Boolean>()); + private final ExecutorService executor; + + // Constants added for testing purposes + public static final long CONCURRENT_UPDATE_STATS_ROW_COUNT = -100l; + public static final long COMPACTION_UPDATE_STATS_ROW_COUNT = -200l; + + public static StatisticsCollectionRunTracker getInstance(Configuration config) { + StatisticsCollectionRunTracker result = INSTANCE; + if (result == null) { + synchronized (StatisticsCollectionRunTracker.class) { + result = INSTANCE; + if (result == null) { + INSTANCE = result = new StatisticsCollectionRunTracker(config); + } + } + } + return result; + } + + private StatisticsCollectionRunTracker(Configuration config) { + int poolSize = + config.getInt(QueryServices.STATS_SERVER_POOL_SIZE, + QueryServicesOptions.DEFAULT_STATS_POOL_SIZE); + executor = Executors.newFixedThreadPool(poolSize, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + return t; + } + }); + } + + /** + * @param regionInfo for the region that should be marked as undergoing stats collection via + * major compaction. + * @return true if the region wasn't already marked for stats collection via compaction, false + * otherwise. + */ + public boolean addCompactingRegion(HRegionInfo regionInfo) { + return compactingRegions.add(regionInfo); + } + + /** + * @param regionInfo for the region that should be unmarked as undergoing stats collection via + * major compaction. + * @return true if the region was marked for stats collection via compaction, false otherwise. + */ + public boolean removeCompactingRegion(HRegionInfo regionInfo) { + return compactingRegions.remove(regionInfo); + } + + /** + * @param regionInfo for the region to check for. + * @return true if stats are being collected for the region via major compaction, false + * otherwise. + */ + public boolean areStatsBeingCollectedOnCompaction(HRegionInfo regionInfo) { + return compactingRegions.contains(regionInfo); + } + + /** + * @param regionInfo for the region to run UPDATE STATISTICS command on. + * @return true if UPDATE STATISTICS wasn't already running on the region, false otherwise. + */ + public boolean addUpdateStatsCommandRegion(HRegionInfo regionInfo) { + return updateStatsRegions.add(regionInfo); + } + + /** + * @param regionInfo for the region to mark as not running UPDATE STATISTICS command on. + * @return true if UPDATE STATISTICS was running on the region, false otherwise. + */ + public boolean removeUpdateStatsCommandRegion(HRegionInfo regionInfo) { + return updateStatsRegions.remove(regionInfo); + } + + /** + * Enqueues the task for execution. + * @param <T> + * @param c task to execute + */ + public <T> Future<T> runTask(Callable<T> c) { + return executor.submit(c); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java index 24e1507..41bdc8b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java @@ -216,14 +216,14 @@ public class StatisticsCollector { } } - public InternalScanner createCompactionScanner(Region region, Store store, InternalScanner s, + public InternalScanner createCompactionScanner(RegionCoprocessorEnvironment env, Store store, InternalScanner s, Pair<HRegionInfo, HRegionInfo> mergeRegions) throws IOException { // See if this is for Major compaction if (logger.isDebugEnabled()) { logger.debug("Compaction scanner created for stats"); } ImmutableBytesPtr cfKey = new ImmutableBytesPtr(store.getFamily().getName()); - return getInternalScanner(region, store, s, cfKey, mergeRegions); + return getInternalScanner(env, store, s, cfKey, mergeRegions); } public void splitStats(Region parent, Region left, Region right) { @@ -245,9 +245,9 @@ public class StatisticsCollector { } } - protected InternalScanner getInternalScanner(Region region, Store store, InternalScanner internalScan, + protected InternalScanner getInternalScanner(RegionCoprocessorEnvironment env, Store store, InternalScanner internalScan, ImmutableBytesPtr family, Pair<HRegionInfo, HRegionInfo> mergeRegions) { - return new StatisticsScanner(this, statsTable, region, internalScan, family, mergeRegions); + return new StatisticsScanner(this, statsTable, env, internalScan, family, mergeRegions); } public void clear() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java index 761b388..876d024 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java @@ -17,16 +17,22 @@ */ package org.apache.phoenix.schema.stats; +import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_COMMIT_STATS_ASYNC; + import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.ScannerContext; @@ -44,15 +50,18 @@ public class StatisticsScanner implements InternalScanner { private StatisticsCollector tracker; private ImmutableBytesPtr family; private Pair<HRegionInfo, HRegionInfo> mergeRegions; + private final Configuration config; - public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, Region region, + public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, RegionCoprocessorEnvironment env, InternalScanner delegate, ImmutableBytesPtr family, Pair<HRegionInfo, HRegionInfo> mergeRegions) { this.tracker = tracker; this.stats = stats; this.delegate = delegate; - this.region = region; + this.region = env.getRegion(); this.family = family; this.mergeRegions = mergeRegions; + this.config = env.getConfiguration(); + StatisticsCollectionRunTracker.getInstance(config).addCompactingRegion(region.getRegionInfo()); } @Override @@ -83,62 +92,80 @@ public class StatisticsScanner implements InternalScanner { @Override public void close() throws IOException { - IOException toThrow = null; - try { - // update the statistics table - // Just verify if this if fine - ArrayList<Mutation> mutations = new ArrayList<Mutation>(); - if (mergeRegions != null) { - if (mergeRegions.getFirst() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting stale stats for the region " - + mergeRegions.getFirst().getRegionNameAsString() + " as part of major compaction"); + boolean async = config.getBoolean(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC); + StatisticsCollectionRunTracker collectionTracker = StatisticsCollectionRunTracker.getInstance(config); + StatisticsScannerCallable callable = new StatisticsScannerCallable(); + if (!async) { + callable.call(); + } else { + collectionTracker.runTask(callable); + } + } + + private class StatisticsScannerCallable implements Callable<Void> { + @Override + public Void call() throws IOException { + IOException toThrow = null; + StatisticsCollectionRunTracker collectionTracker = StatisticsCollectionRunTracker.getInstance(config); + final HRegionInfo regionInfo = region.getRegionInfo(); + try { + // update the statistics table + // Just verify if this if fine + ArrayList<Mutation> mutations = new ArrayList<Mutation>(); + if (mergeRegions != null) { + if (mergeRegions.getFirst() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting stale stats for the region " + + mergeRegions.getFirst().getRegionNameAsString() + " as part of major compaction"); + } + stats.deleteStats(mergeRegions.getFirst().getRegionName(), tracker, family, mutations); } - stats.deleteStats(mergeRegions.getFirst().getRegionName(), tracker, family, mutations); - } - if (mergeRegions.getSecond() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting stale stats for the region " - + mergeRegions.getSecond().getRegionNameAsString() + " as part of major compaction"); + if (mergeRegions.getSecond() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting stale stats for the region " + + mergeRegions.getSecond().getRegionNameAsString() + " as part of major compaction"); + } + stats.deleteStats(mergeRegions.getSecond().getRegionName(), tracker, family, mutations); } - stats.deleteStats(mergeRegions.getSecond().getRegionName(), tracker, family, mutations); } - } - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting the stats for the region " + region.getRegionInfo().getRegionNameAsString() + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting the stats for the region " + regionInfo.getRegionNameAsString() + " as part of major compaction"); - } - stats.deleteStats(region.getRegionInfo().getRegionName(), this.tracker, family, mutations); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding new stats for the region " + region.getRegionInfo().getRegionNameAsString() + } + stats.deleteStats(regionInfo.getRegionName(), tracker, family, mutations); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding new stats for the region " + regionInfo.getRegionNameAsString() + " as part of major compaction"); - } - stats.addStats(region.getRegionInfo().getRegionName(), this.tracker, family, mutations); - if (LOG.isDebugEnabled()) { - LOG.debug("Committing new stats for the region " + region.getRegionInfo().getRegionNameAsString() + } + stats.addStats(regionInfo.getRegionName(), tracker, family, mutations); + if (LOG.isDebugEnabled()) { + LOG.debug("Committing new stats for the region " + regionInfo.getRegionNameAsString() + " as part of major compaction"); - } - stats.commitStats(mutations); - } catch (IOException e) { - LOG.error("Failed to update statistics table!", e); - toThrow = e; - } finally { - try { - stats.close(); + } + stats.commitStats(mutations); } catch (IOException e) { - if (toThrow == null) toThrow = e; - LOG.error("Error while closing the stats table", e); + LOG.error("Failed to update statistics table!", e); + toThrow = e; } finally { - // close the delegate scanner try { - delegate.close(); + collectionTracker.removeCompactingRegion(regionInfo); + stats.close(); } catch (IOException e) { if (toThrow == null) toThrow = e; - LOG.error("Error while closing the scanner", e); + LOG.error("Error while closing the stats table", e); } finally { - if (toThrow != null) { throw toThrow; } + // close the delegate scanner + try { + delegate.close(); + } catch (IOException e) { + if (toThrow == null) toThrow = e; + LOG.error("Error while closing the scanner", e); + } finally { + if (toThrow != null) { throw toThrow; } + } } } + return null; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java index 834675c..22d1ab7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java @@ -24,11 +24,8 @@ import java.util.Collections; import java.util.List; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; http://git-wip-us.apache.org/repos/asf/phoenix/blob/05ff5618/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java index 5289ab9..215110c 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java @@ -55,6 +55,8 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { public static final int DEFAULT_MIN_STATS_UPDATE_FREQ_MS = 0; public static final boolean DEFAULT_EXPLAIN_CHUNK_COUNT = false; // TODO: update explain plans in test and set to true public static final String DEFAULT_EXTRA_JDBC_ARGUMENTS = PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM; + private static final boolean DEFAULT_RUN_UPDATE_STATS_ASYNC = false; + private static final boolean DEFAULT_COMMIT_STATS_ASYNC = false; /** @@ -93,7 +95,9 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { .setMaxClientMetaDataCacheSize(DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE) .setMaxServerMetaDataCacheSize(DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE) .setForceRowKeyOrder(DEFAULT_FORCE_ROWKEY_ORDER) - .setExtraJDBCArguments(DEFAULT_EXTRA_JDBC_ARGUMENTS); + .setExtraJDBCArguments(DEFAULT_EXTRA_JDBC_ARGUMENTS) + .setRunUpdateStatsAsync(DEFAULT_RUN_UPDATE_STATS_ASYNC) + .setCommitStatsAsync(DEFAULT_COMMIT_STATS_ASYNC); } public QueryServicesTestImpl(ReadOnlyProps defaultProps, ReadOnlyProps overrideProps) {