PHOENIX-1261 Update stats table asynchronously
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/08aa07f5 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/08aa07f5 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/08aa07f5 Branch: refs/heads/4.x-HBase-1.0 Commit: 08aa07f55395eb0d1195ef5646a4669947b4df05 Parents: 7c76bba Author: Samarth <[email protected]> Authored: Tue Dec 22 13:56:56 2015 -0800 Committer: Samarth <[email protected]> Committed: Tue Dec 22 13:56:56 2015 -0800 ---------------------------------------------------------------------- .../StatisticsCollectionRunTrackerIT.java | 167 ++++++++ .../phoenix/end2end/StatsCollectorIT.java | 6 +- .../org/apache/phoenix/cache/GlobalCache.java | 7 +- .../coprocessor/BaseScannerRegionObserver.java | 3 +- .../UngroupedAggregateRegionObserver.java | 378 +++++++++++++------ .../org/apache/phoenix/query/QueryServices.java | 3 + .../phoenix/query/QueryServicesOptions.java | 17 + .../apache/phoenix/schema/MetaDataClient.java | 8 +- .../stats/StatisticsCollectionRunTracker.java | 130 +++++++ .../schema/stats/StatisticsCollector.java | 8 +- .../phoenix/schema/stats/StatisticsScanner.java | 121 +++--- .../phoenix/schema/stats/StatisticsWriter.java | 3 - .../phoenix/query/QueryServicesTestImpl.java | 6 +- 13 files changed, 684 insertions(+), 173 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/08aa07f5/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java new file mode 100644 index 0000000..c64038e --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/coprocessor/StatisticsCollectionRunTrackerIT.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT; +import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CONCURRENT_UPDATE_STATS_ROW_COUNT; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class StatisticsCollectionRunTrackerIT extends BaseOwnClusterHBaseManagedTimeIT { + @BeforeClass + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + // Must update config before starting server + props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20)); + props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1024)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testStateBeforeAndAfterUpdateStatsCommand() throws Exception { + String tableName = "testStateBeforeAndAfterUpdateStatsCommand".toUpperCase(); + HRegionInfo regionInfo = createTableAndGetRegion(tableName); + StatisticsCollectionRunTracker tracker = + StatisticsCollectionRunTracker.getInstance(new Configuration()); + // assert that the region wasn't added to the tracker + assertTrue(tracker.addUpdateStatsCommandRegion(regionInfo)); + // assert that removing the region from the tracker works + assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo)); + runUpdateStats(tableName); + // assert that after update stats is complete, tracker isn't tracking the region any more + assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo)); + } + + @Test + public void testStateBeforeAndAfterMajorCompaction() throws Exception { + String tableName = "testStateBeforeAndAfterMajorCompaction".toUpperCase(); + HRegionInfo regionInfo = createTableAndGetRegion(tableName); + StatisticsCollectionRunTracker tracker = + StatisticsCollectionRunTracker.getInstance(new Configuration()); + // Upsert values in the table. + String keyPrefix = "aaaaaaaaaaaaaaaaaaaa"; + String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)"; + try (Connection conn = DriverManager.getConnection(getUrl())) { + PreparedStatement stmt = conn.prepareStatement(upsert); + for (int i = 0; i < 1000; i++) { + stmt.setString(1, keyPrefix + i); + stmt.setString(2, "KV" + i); + stmt.executeUpdate(); + } + conn.commit(); + } + // assert that the region wasn't added to the tracker + assertTrue(tracker.addCompactingRegion(regionInfo)); + // assert that removing the region from the tracker works + assertTrue(tracker.removeCompactingRegion(regionInfo)); + runMajorCompaction(tableName); + + // assert that after major compaction is complete, tracker isn't tracking the region any more + assertFalse(tracker.removeCompactingRegion(regionInfo)); + } + + @Test + public void testMajorCompactionPreventsUpdateStatsFromRunning() throws Exception { + String tableName = "testMajorCompactionPreventsUpdateStatsFromRunning".toUpperCase(); + HRegionInfo regionInfo = createTableAndGetRegion(tableName); + // simulate stats collection via major compaction by marking the region as compacting in the tracker + markRegionAsCompacting(regionInfo); + Assert.assertEquals("Row count didn't match", COMPACTION_UPDATE_STATS_ROW_COUNT, runUpdateStats(tableName)); + StatisticsCollectionRunTracker tracker = + StatisticsCollectionRunTracker.getInstance(new Configuration()); + // assert that the tracker state was cleared. + assertFalse(tracker.removeUpdateStatsCommandRegion(regionInfo)); + } + + @Test + public void testUpdateStatsPreventsAnotherUpdateStatsFromRunning() throws Exception { + String tableName = "testUpdateStatsPreventsAnotherUpdateStatsFromRunning".toUpperCase(); + HRegionInfo regionInfo = createTableAndGetRegion(tableName); + markRunningUpdateStats(regionInfo); + Assert.assertEquals("Row count didn't match", CONCURRENT_UPDATE_STATS_ROW_COUNT, + runUpdateStats(tableName)); + StatisticsCollectionRunTracker tracker = + StatisticsCollectionRunTracker.getInstance(new Configuration()); + // assert that running the concurrent and race-losing update stats didn't clear the region + // from the tracker. If the method returned true it means the tracker was still tracking + // the region. Slightly counter-intuitive, yes. + assertTrue(tracker.removeUpdateStatsCommandRegion(regionInfo)); + } + + private void markRegionAsCompacting(HRegionInfo regionInfo) { + StatisticsCollectionRunTracker tracker = + StatisticsCollectionRunTracker.getInstance(new Configuration()); + tracker.addCompactingRegion(regionInfo); + } + + private void markRunningUpdateStats(HRegionInfo regionInfo) { + StatisticsCollectionRunTracker tracker = + StatisticsCollectionRunTracker.getInstance(new Configuration()); + tracker.addUpdateStatsCommandRegion(regionInfo); + } + + private HRegionInfo createTableAndGetRegion(String tableName) throws Exception { + byte[] tableNameBytes = Bytes.toBytes(tableName); + String ddl = "CREATE TABLE " + tableName + " (PK1 VARCHAR PRIMARY KEY, KV1 VARCHAR)"; + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute(ddl); + PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class); + try (HBaseAdmin admin = phxConn.getQueryServices().getAdmin()) { + List<HRegionInfo> tableRegions = admin.getTableRegions(tableNameBytes); + return tableRegions.get(0); + } + } + } + + private long runUpdateStats(String tableName) throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + return conn.createStatement().executeUpdate("UPDATE STATISTICS " + tableName); + } + } + + private void runMajorCompaction(String tableName) throws Exception { + try (PhoenixConnection conn = DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class)) { + try (HBaseAdmin admin = conn.getQueryServices().getAdmin()) { + admin.flush(tableName); + admin.majorCompact(tableName); + Thread.sleep(10000); + } + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/08aa07f5/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java index 23859d6..5e048c5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Properties; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.ConnectionQueryServices; @@ -241,8 +242,9 @@ public class StatsCollectorIT extends StatsCollectorAbstractIT { ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); HBaseAdmin admin = services.getAdmin(); try { - admin.flush(tableName); - admin.majorCompact(tableName); + TableName t = TableName.valueOf(tableName); + admin.flush(t); + admin.majorCompact(t); Thread.sleep(10000); // FIXME: how do we know when compaction is done? } finally { admin.close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/08aa07f5/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java index 643112d..af5438c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java @@ -32,11 +32,9 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.memory.ChildMemoryManager; import org.apache.phoenix.memory.GlobalMemoryManager; -import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PMetaDataEntity; -import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.SizedUtil; import com.google.common.cache.Cache; @@ -53,13 +51,13 @@ import com.google.common.cache.Weigher; * @since 0.1 */ public class GlobalCache extends TenantCacheImpl { - private static GlobalCache INSTANCE; + private static volatile GlobalCache INSTANCE; private final Configuration config; // TODO: Use Guava cache with auto removal after lack of access private final ConcurrentMap<ImmutableBytesWritable,TenantCache> perTenantCacheMap = new ConcurrentHashMap<ImmutableBytesWritable,TenantCache>(); // Cache for lastest PTable for a given Phoenix table - private Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache; + private volatile Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache; public void clearTenantCache() { perTenantCacheMap.clear(); @@ -177,4 +175,5 @@ public class GlobalCache extends TenantCacheImpl { return super.hashCode(); } } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/08aa07f5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index f98499f..5ab432c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -92,6 +92,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String GUIDEPOST_PER_REGION = "_GUIDEPOST_PER_REGION"; public static final String UPGRADE_DESC_ROW_KEY = "_UPGRADE_DESC_ROW_KEY"; public static final String SCAN_REGION_SERVER = "_SCAN_REGION_SERVER"; + public static final String RUN_UPDATE_STATS_ASYNC = "_RunUpdateStatsAsync"; /** * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations @@ -169,7 +170,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { if (!isRegionObserverFor(scan)) { return s; } - boolean success =false; + boolean success = false; // Save the current span. When done with the child span, reset the span back to // what it was. Otherwise, this causes the thread local storing the current span // to not be reset back to null causing catastrophic infinite loops http://git-wip-us.apache.org/repos/asf/phoenix/blob/08aa07f5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index a19cac9..562e904 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -21,7 +21,11 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY; +import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_COMMIT_STATS_ASYNC; +import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT; +import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CONCURRENT_UPDATE_STATS_ROW_COUNT; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -35,7 +39,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; +import co.cask.tephra.TxConstants; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; @@ -88,6 +96,7 @@ import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.ValueSchema.Field; +import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker; import org.apache.phoenix.schema.stats.StatisticsCollector; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; import org.apache.phoenix.schema.types.PBinary; @@ -95,6 +104,7 @@ import org.apache.phoenix.schema.types.PChar; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PDouble; import org.apache.phoenix.schema.types.PFloat; +import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; @@ -108,19 +118,18 @@ import org.apache.phoenix.util.TimeKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import co.cask.tephra.TxConstants; - /** * Region observer that aggregates ungrouped rows(i.e. SQL query with aggregation function and no GROUP BY). - * - * + * + * * @since 0.1 */ -public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ +public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver { // TODO: move all constants into a single class public static final String UNGROUPED_AGG = "UngroupedAgg"; public static final String DELETE_AGG = "DeleteAgg"; @@ -131,7 +140,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ public static final String EMPTY_CF = "EmptyCF"; private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class); private KeyValueBuilder kvBuilder; - + @Override public void start(CoprocessorEnvironment e) throws IOException { super.start(e); @@ -141,28 +150,28 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } private static void commitBatch(HRegion region, List<Mutation> mutations, byte[] indexUUID) throws IOException { - if (indexUUID != null) { - for (Mutation m : mutations) { - m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID); - } - } - Mutation[] mutationArray = new Mutation[mutations.size()]; - // TODO: should we use the one that is all or none? - logger.warn("Committing bactch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString()); - region.batchMutate(mutations.toArray(mutationArray)); + if (indexUUID != null) { + for (Mutation m : mutations) { + m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID); + } + } + Mutation[] mutationArray = new Mutation[mutations.size()]; + // TODO: should we use the one that is all or none? + logger.warn("Committing bactch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString()); + region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE); } public static void serializeIntoScan(Scan scan) { scan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, QueryConstants.TRUE); } - + @Override public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s) throws IOException { s = super.preScannerOpen(e, scan, s); if (ScanUtil.isAnalyzeTable(scan)) { // We are setting the start row and stop row such that it covers the entire region. As part - // of Phonenix-1263 we are storing the guideposts against the physical table rather than + // of Phonenix-1263 we are storing the guideposts against the physical table rather than // individual tenant specific tables. scan.setStartRow(HConstants.EMPTY_START_ROW); scan.setStopRow(HConstants.EMPTY_END_ROW); @@ -170,17 +179,22 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } return s; } - + @Override protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException { - HRegion region = c.getEnvironment().getRegion(); + RegionCoprocessorEnvironment env = c.getEnvironment(); + HRegion region = env.getRegion(); long ts = scan.getTimeRange().getMax(); - StatisticsCollector stats = null; - if(ScanUtil.isAnalyzeTable(scan)) { - byte[] gp_width_bytes = scan.getAttribute(BaseScannerRegionObserver.GUIDEPOST_WIDTH_BYTES); - byte[] gp_per_region_bytes = scan.getAttribute(BaseScannerRegionObserver.GUIDEPOST_PER_REGION); + if (ScanUtil.isAnalyzeTable(scan)) { + byte[] gp_width_bytes = + scan.getAttribute(BaseScannerRegionObserver.GUIDEPOST_WIDTH_BYTES); + byte[] gp_per_region_bytes = + scan.getAttribute(BaseScannerRegionObserver.GUIDEPOST_PER_REGION); // Let this throw, as this scan is being done for the sole purpose of collecting stats - stats = new StatisticsCollector(c.getEnvironment(), region.getRegionInfo().getTable().getNameAsString(), ts, gp_width_bytes, gp_per_region_bytes); + StatisticsCollector statsCollector = + new StatisticsCollector(env, region.getRegionInfo().getTable() + .getNameAsString(), ts, gp_width_bytes, gp_per_region_bytes); + return collectStats(s, statsCollector, region, scan, env.getConfiguration()); } int offsetToBe = 0; if (ScanUtil.isLocalIndex(scan)) { @@ -248,7 +262,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ if ((localIndexScan && !isDelete && !isDescRowKeyOrderUpgrade) || (j == null && p != null)) { if (dataColumns != null) { tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); - dataRegion = IndexUtil.getDataRegion(c.getEnvironment()); + dataRegion = IndexUtil.getDataRegion(env); viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); } ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); @@ -258,7 +272,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } if (j != null) { - theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), c.getEnvironment()); + theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), env); } int batchSize = 0; @@ -267,22 +281,20 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ if (isDescRowKeyOrderUpgrade || isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) { // TODO: size better mutations = Lists.newArrayListWithExpectedSize(1024); - batchSize = c.getEnvironment().getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); + batchSize = env.getConfiguration().getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); } Aggregators aggregators = ServerAggregators.deserialize( - scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), c.getEnvironment().getConfiguration()); + scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), env.getConfiguration()); Aggregator[] rowAggregators = aggregators.getAggregators(); boolean hasMore; boolean hasAny = false; MultiKeyValueTuple result = new MultiKeyValueTuple(); if (logger.isDebugEnabled()) { - logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan))); + logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan))); } long rowCount = 0; final RegionScanner innerScanner = theScanner; region.startRegionOperation(); - boolean updateStats = stats != null; - boolean success = false; try { synchronized (innerScanner) { do { @@ -291,9 +303,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ // since this is an indication of whether or not there are more values after the // ones returned hasMore = innerScanner.nextRaw(results); - if (updateStats) { - stats.collectStatistics(results); - } if (!results.isEmpty()) { rowCount++; result.setKeyValues(results); @@ -313,8 +322,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ // Special case for re-writing DESC ARRAY, as the actual byte value needs to change in this case if (field.getDataType().isArrayType()) { field.getDataType().coerceBytes(ptr, null, field.getDataType(), - field.getMaxLength(), field.getScale(), field.getSortOrder(), - field.getMaxLength(), field.getScale(), field.getSortOrder(), true); // force to use correct separator byte + field.getMaxLength(), field.getScale(), field.getSortOrder(), + field.getMaxLength(), field.getScale(), field.getSortOrder(), true); // force to use correct separator byte } // Special case for re-writing DESC CHAR or DESC BINARY, to force the re-writing of trailing space characters else if (field.getDataType() == PChar.INSTANCE || field.getDataType() == PBinary.INSTANCE) { @@ -323,7 +332,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ len--; } ptr.set(ptr.get(), ptr.getOffset(), len); - // Special case for re-writing DESC FLOAT and DOUBLE, as they're not inverted like they should be (PHOENIX-2171) + // Special case for re-writing DESC FLOAT and DOUBLE, as they're not inverted like they should be (PHOENIX-2171) } else if (field.getDataType() == PFloat.INSTANCE || field.getDataType() == PDouble.INSTANCE) { byte[] invertedBytes = SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength()); ptr.set(invertedBytes); @@ -342,8 +351,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } writeToTable.newKey(ptr, values); if (Bytes.compareTo( - firstKV.getRowArray(), firstKV.getRowOffset() + offset, firstKV.getRowLength(), - ptr.get(),ptr.getOffset() + offset,ptr.getLength()) == 0) { + firstKV.getRowArray(), firstKV.getRowOffset() + offset, firstKV.getRowLength(), + ptr.get(),ptr.getOffset() + offset,ptr.getLength()) == 0) { continue; } byte[] newRow = ByteUtil.copyKeyBytesIfNecessary(ptr); @@ -357,21 +366,21 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ for (Cell cell : results) { // Copy existing cell but with new row key Cell newCell = new KeyValue(newRow, 0, newRow.length, - cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), - cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), - cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()), - cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), + cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), + cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()), + cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); switch (KeyValue.Type.codeToType(cell.getTypeByte())) { case Put: // If Put, point delete old Put Delete del = new Delete(oldRow); del.addDeleteMarker(new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), - cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), - cell.getQualifierArray(), cell.getQualifierOffset(), - cell.getQualifierLength(), cell.getTimestamp(), KeyValue.Type.Delete, - ByteUtil.EMPTY_BYTE_ARRAY, 0, 0)); + cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), + cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength(), cell.getTimestamp(), KeyValue.Type.Delete, + ByteUtil.EMPTY_BYTE_ARRAY, 0, 0)); mutations.add(del); - + Put put = new Put(newRow); put.add(newCell); mutations.add(put); @@ -391,13 +400,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ if (!results.isEmpty()) { result.getKey(ptr); ValueGetter valueGetter = - maintainer.createGetterFromKeyValues( - ImmutableBytesPtr.copyBytesIfNecessary(ptr), - results); + maintainer.createGetterFromKeyValues( + ImmutableBytesPtr.copyBytesIfNecessary(ptr), + results); Put put = maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, ts, - c.getEnvironment().getRegion().getStartKey(), - c.getEnvironment().getRegion().getEndKey()); + env.getRegion().getRegionInfo().getStartKey(), + env.getRegion().getRegionInfo().getEndKey()); indexMutations.add(put); } } @@ -406,12 +415,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ // FIXME: the version of the Delete constructor without the lock // args was introduced in 0.94.4, thus if we try to use it here // we can no longer use the 0.94.2 version of the client. - Cell firstKV = results.get(0); - Delete delete = new Delete(firstKV.getRowArray(), - firstKV.getRowOffset(), firstKV.getRowLength(),ts); - mutations.add(delete); - // force tephra to ignore this deletes - delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + Cell firstKV = results.get(0); + Delete delete = new Delete(firstKV.getRowArray(), + firstKV.getRowOffset(), firstKV.getRowLength(),ts); + mutations.add(delete); + // force tephra to ignore this deletes + delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); } else if (isUpsert) { Arrays.fill(values, null); int i = 0; @@ -423,7 +432,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ // If SortOrder from expression in SELECT doesn't match the // column being projected into then invert the bits. if (expression.getSortOrder() != - projectedColumns.get(i).getSortOrder()) { + projectedColumns.get(i).getSortOrder()) { SortOrder.invert(values[i], 0, values[i], 0, values[i].length); } @@ -436,13 +445,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ if (expression.evaluate(result, ptr)) { PColumn column = projectedColumns.get(i); Object value = expression.getDataType() - .toObject(ptr, column.getSortOrder()); + .toObject(ptr, column.getSortOrder()); // We are guaranteed that the two column will have the // same type. if (!column.getDataType().isSizeCompatible(ptr, value, - column.getDataType(), expression.getMaxLength(), - expression.getScale(), column.getMaxLength(), - column.getScale())) { + column.getDataType(), expression.getMaxLength(), + expression.getScale(), column.getMaxLength(), + column.getScale())) { throw new DataExceedsCapacityException( column.getDataType(), column.getMaxLength(), column.getScale()); @@ -486,7 +495,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ * We insert one empty key value per row per timestamp. */ Set<Long> timeStamps = - Sets.newHashSetWithExpectedSize(results.size()); + Sets.newHashSetWithExpectedSize(results.size()); for (Cell kv : results) { long kvts = kv.getTimestamp(); if (!timeStamps.contains(kvts)) { @@ -512,37 +521,25 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } catch (ConstraintViolationException e) { // Log and ignore in count logger.error(LogUtil.addCustomAnnotations("Failed to create row in " + - region.getRegionNameAsString() + " with values " + - SchemaUtil.toString(values), - ScanUtil.getCustomAnnotations(scan)), e); + region.getRegionInfo().getRegionNameAsString() + " with values " + + SchemaUtil.toString(values), + ScanUtil.getCustomAnnotations(scan)), e); continue; } aggregators.aggregate(rowAggregators, result); hasAny = true; } } while (hasMore); - success = true; } } finally { try { - if (success && updateStats) { - try { - stats.updateStatistic(region); - } finally { - stats.close(); - } - } + innerScanner.close(); } finally { - try { - innerScanner.close(); - } finally { - region.closeRegionOperation(); - } + region.closeRegionOperation(); } } - if (logger.isDebugEnabled()) { - logger.debug(LogUtil.addCustomAnnotations("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan, ScanUtil.getCustomAnnotations(scan))); + logger.debug(LogUtil.addCustomAnnotations("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan, ScanUtil.getCustomAnnotations(scan))); } if (!mutations.isEmpty()) { @@ -560,7 +557,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length); } final KeyValue aggKeyValue = keyValue; - + RegionScanner scanner = new BaseRegionScanner() { private boolean done = !hadAny; @@ -586,10 +583,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ results.add(aggKeyValue); return false; } - + @Override public long getMaxResultSize() { - return scan.getMaxResultSize(); + return scan.getMaxResultSize(); } }; return scanner; @@ -610,7 +607,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ table = c.getEnvironment().getTable(indexTable); table.batch(indexMutations); } catch (InterruptedException ie) { - ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), + ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionInfo().getRegionNameAsString(), ie); } finally { if (table != null) table.close(); @@ -618,7 +615,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } indexMutations.clear(); } - + @Override public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, final Store store, InternalScanner scanner, final ScanType scanType) @@ -643,7 +640,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ StatisticsCollector stats = new StatisticsCollector( c.getEnvironment(), table.getNameAsString(), clientTimeStamp, store.getFamily().getName()); - internalScanner = stats.createCompactionScanner(c.getEnvironment().getRegion(), store, scanner, mergeRegions); + internalScanner = stats.createCompactionScanner(c.getEnvironment(), store, scanner, mergeRegions); } catch (IOException e) { // If we can't reach the stats table, don't interrupt the normal // compaction operation, just log a warning. @@ -654,48 +651,209 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } return internalScanner; } - - + + @Override public void postSplit(final ObserverContext<RegionCoprocessorEnvironment> e, final HRegion l, final HRegion r) throws IOException { - final HRegion region = e.getEnvironment().getRegion(); + final Configuration config = e.getEnvironment().getConfiguration(); + boolean async = config.getBoolean(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC); + if (!async) { + splitStatsInternal(e, l, r); + } else { + StatisticsCollectionRunTracker.getInstance(config).runTask(new Callable<Void>() { + @Override + public Void call() throws Exception { + splitStatsInternal(e, l, r); + return null; + } + }); + } + } + + private void splitStatsInternal(final ObserverContext<RegionCoprocessorEnvironment> e, + final HRegion l, final HRegion r) { + final RegionCoprocessorEnvironment env = e.getEnvironment(); + final HRegion region = env.getRegion(); final TableName table = region.getRegionInfo().getTable(); try { - boolean useCurrentTime = - e.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, + boolean useCurrentTime = + env.getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME); // Provides a means of clients controlling their timestamps to not use current time // when background tasks are updating stats. Instead we track the max timestamp of // the cells and use that. - final long clientTimeStamp = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : StatisticsCollector.NO_TIMESTAMP; + final long clientTimeStamp = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : + StatisticsCollector.NO_TIMESTAMP; User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - StatisticsCollector stats = new StatisticsCollector(e.getEnvironment(), - table.getNameAsString(), clientTimeStamp); - try { - stats.splitStats(region, l, r); - return null; - } finally { - if (stats != null) stats.close(); - } - } - }); - } catch (IOException ioe) { + @Override + public Void run() throws Exception { + StatisticsCollector stats = new StatisticsCollector(env, + table.getNameAsString(), clientTimeStamp); + try { + stats.splitStats(region, l, r); + return null; + } finally { + if (stats != null) stats.close(); + } + } + }); + } catch (IOException ioe) { if(logger.isWarnEnabled()) { logger.warn("Error while collecting stats during split for " + table,ioe); } - } + } } + private static PTable deserializeTable(byte[] b) { try { PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b); return PTableImpl.createFromProto(ptableProto); } catch (IOException e) { throw new RuntimeException(e); - } + } + } + + private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats, + final HRegion region, final Scan scan, Configuration config) throws IOException { + StatsCollectionCallable callable = + new StatsCollectionCallable(stats, region, innerScanner, config); + byte[] asyncBytes = scan.getAttribute(BaseScannerRegionObserver.RUN_UPDATE_STATS_ASYNC); + boolean async = false; + if (asyncBytes != null) { + async = Bytes.toBoolean(asyncBytes); + } + long rowCount = 0; // in case of async, we report 0 as number of rows updated + StatisticsCollectionRunTracker statsRunTracker = + StatisticsCollectionRunTracker.getInstance(config); + boolean runUpdateStats = statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo()); + if (runUpdateStats) { + if (!async) { + rowCount = callable.call(); + } else { + statsRunTracker.runTask(callable); + } + } else { + rowCount = CONCURRENT_UPDATE_STATS_ROW_COUNT; + logger.info("UPDATE STATISTICS didn't run because another UPDATE STATISTICS command was already running on the region " + + region.getRegionInfo().getRegionNameAsString()); + } + byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount)); + final KeyValue aggKeyValue = + KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, + SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length); + RegionScanner scanner = new BaseRegionScanner() { + @Override + public HRegionInfo getRegionInfo() { + return region.getRegionInfo(); + } + + @Override + public boolean isFilterDone() { + return true; + } + + @Override + public void close() throws IOException { + // no-op because we want to manage closing of the inner scanner ourselves. + } + + @Override + public boolean next(List<Cell> results) throws IOException { + results.add(aggKeyValue); + return false; + } + + @Override + public long getMaxResultSize() { + return scan.getMaxResultSize(); + } + }; + return scanner; + } + + /** + * + * Callable to encapsulate the collection of stats triggered by + * UPDATE STATISTICS command. + * + * Package private for tests. + */ + static class StatsCollectionCallable implements Callable<Long> { + private final StatisticsCollector stats; + private final HRegion region; + private final RegionScanner innerScanner; + private final Configuration config; + + StatsCollectionCallable(StatisticsCollector s, HRegion r, RegionScanner rs, + Configuration config) { + this.stats = s; + this.region = r; + this.innerScanner = rs; + this.config = config; + } + + @Override + public Long call() throws IOException { + return collectStatsInternal(); + } + + private boolean areStatsBeingCollectedViaCompaction() { + return StatisticsCollectionRunTracker.getInstance(config) + .areStatsBeingCollectedOnCompaction(region.getRegionInfo()); + } + + private long collectStatsInternal() throws IOException { + long startTime = System.currentTimeMillis(); + region.startRegionOperation(); + boolean hasMore = false; + boolean noErrors = false; + boolean compactionRunning = areStatsBeingCollectedViaCompaction(); + long rowCount = 0; + try { + if (!compactionRunning) { + synchronized (innerScanner) { + do { + List<Cell> results = new ArrayList<Cell>(); + hasMore = innerScanner.nextRaw(results); + stats.collectStatistics(results); + rowCount++; + compactionRunning = areStatsBeingCollectedViaCompaction(); + } while (hasMore && !compactionRunning); + noErrors = true; + } + } + return compactionRunning ? COMPACTION_UPDATE_STATS_ROW_COUNT : rowCount; + } catch (IOException e) { + logger.error("IOException in update stats: " + Throwables.getStackTraceAsString(e)); + throw e; + } finally { + try { + if (noErrors && !compactionRunning) { + stats.updateStatistic(region); + logger.info("UPDATE STATISTICS finished successfully for scanner: " + + innerScanner + ". Number of rows scanned: " + rowCount + + ". Time: " + (System.currentTimeMillis() - startTime)); + } + if (compactionRunning) { + logger.info("UPDATE STATISTICS stopped in between because major compaction was running for region " + + region.getRegionInfo().getRegionNameAsString()); + } + } finally { + try { + StatisticsCollectionRunTracker.getInstance(config).removeUpdateStatsCommandRegion(region.getRegionInfo()); + stats.close(); + } finally { + try { + innerScanner.close(); + } finally { + region.closeRegionOperation(); + } + } + } + } + } } private static List<Expression> deserializeExpressions(byte[] b) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/08aa07f5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 155ad3e..908c479 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -151,6 +151,9 @@ public interface QueryServices extends SQLCloseable { public static final String STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB = "phoenix.stats.guidepost.width"; public static final String STATS_GUIDEPOST_PER_REGION_ATTRIB = "phoenix.stats.guidepost.per.region"; public static final String STATS_USE_CURRENT_TIME_ATTRIB = "phoenix.stats.useCurrentTime"; + public static final String RUN_UPDATE_STATS_ASYNC = "phoenix.update.stats.command.async"; + public static final String STATS_SERVER_POOL_SIZE = "phoenix.stats.pool.size"; + public static final String COMMIT_STATS_ASYNC = "phoenix.stats.commit.async"; public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = "phoenix.sequence.saltBuckets"; public static final String COPROCESSOR_PRIORITY_ATTRIB = "phoenix.coprocessor.priority"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/08aa07f5/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 0a0bf07..730a5ef 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -22,6 +22,7 @@ import static org.apache.phoenix.query.QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME; import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB; import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS; +import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC; import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB; import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB; import static org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK; @@ -54,6 +55,7 @@ import static org.apache.phoenix.query.QueryServices.REGIONSERVER_INFO_PORT_ATTR import static org.apache.phoenix.query.QueryServices.REGIONSERVER_LEASE_PERIOD_ATTRIB; import static org.apache.phoenix.query.QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB; import static org.apache.phoenix.query.QueryServices.RPC_TIMEOUT_ATTRIB; +import static org.apache.phoenix.query.QueryServices.RUN_UPDATE_STATS_ASYNC; import static org.apache.phoenix.query.QueryServices.SCAN_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.SCAN_RESULT_CHUNK_SIZE; import static org.apache.phoenix.query.QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB; @@ -173,6 +175,9 @@ public class QueryServicesOptions { // compression we're getting) public static final long DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES = 3* 100 * 1024 *1024; public static final boolean DEFAULT_STATS_USE_CURRENT_TIME = true; + public static final boolean DEFAULT_RUN_UPDATE_STATS_ASYNC = true; + public static final boolean DEFAULT_COMMIT_STATS_ASYNC = true; + public static final int DEFAULT_STATS_POOL_SIZE = 4; public static final boolean DEFAULT_USE_REVERSE_SCAN = true; @@ -248,6 +253,8 @@ public class QueryServicesOptions { Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); QueryServicesOptions options = new QueryServicesOptions(config) .setIfUnset(STATS_USE_CURRENT_TIME_ATTRIB, DEFAULT_STATS_USE_CURRENT_TIME) + .setIfUnset(RUN_UPDATE_STATS_ASYNC, DEFAULT_RUN_UPDATE_STATS_ASYNC) + .setIfUnset(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC) .setIfUnset(KEEP_ALIVE_MS_ATTRIB, DEFAULT_KEEP_ALIVE_MS) .setIfUnset(THREAD_POOL_SIZE_ATTRIB, DEFAULT_THREAD_POOL_SIZE) .setIfUnset(QUEUE_SIZE_ATTRIB, DEFAULT_QUEUE_SIZE) @@ -576,4 +583,14 @@ public class QueryServicesOptions { config.set(EXTRA_JDBC_ARGUMENTS_ATTRIB, extraArgs); return this; } + + public QueryServicesOptions setRunUpdateStatsAsync(boolean flag) { + config.setBoolean(RUN_UPDATE_STATS_ASYNC, flag); + return this; + } + + public QueryServicesOptions setCommitStatsAsync(boolean flag) { + config.setBoolean(COMMIT_STATS_ASYNC, flag); + return this; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/08aa07f5/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 9c365ab..35d61ff 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -904,7 +904,13 @@ public class MetaDataClient { } } } - return new MutationState((int)rowCount, connection); + final long count = rowCount; + return new MutationState(1, connection) { + @Override + public long getUpdateCount() { + return count; + } + }; } private long updateStatisticsInternal(PName physicalName, PTable logicalTable, Map<String, Object> statsProps) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/08aa07f5/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java new file mode 100644 index 0000000..5f6be3f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stats; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; + +/** + * Singleton that is used to track state associated with regions undergoing stats collection at the + * region server's JVM level. + */ +public class StatisticsCollectionRunTracker { + private static volatile StatisticsCollectionRunTracker INSTANCE; + private final Set<HRegionInfo> updateStatsRegions = Collections + .newSetFromMap(new ConcurrentHashMap<HRegionInfo, Boolean>()); + private final Set<HRegionInfo> compactingRegions = Collections + .newSetFromMap(new ConcurrentHashMap<HRegionInfo, Boolean>()); + private final ExecutorService executor; + + // Constants added for testing purposes + public static final long CONCURRENT_UPDATE_STATS_ROW_COUNT = -100l; + public static final long COMPACTION_UPDATE_STATS_ROW_COUNT = -200l; + + public static StatisticsCollectionRunTracker getInstance(Configuration config) { + StatisticsCollectionRunTracker result = INSTANCE; + if (result == null) { + synchronized (StatisticsCollectionRunTracker.class) { + result = INSTANCE; + if (result == null) { + INSTANCE = result = new StatisticsCollectionRunTracker(config); + } + } + } + return result; + } + + private StatisticsCollectionRunTracker(Configuration config) { + int poolSize = + config.getInt(QueryServices.STATS_SERVER_POOL_SIZE, + QueryServicesOptions.DEFAULT_STATS_POOL_SIZE); + executor = Executors.newFixedThreadPool(poolSize, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + return t; + } + }); + } + + /** + * @param regionInfo for the region that should be marked as undergoing stats collection via + * major compaction. + * @return true if the region wasn't already marked for stats collection via compaction, false + * otherwise. + */ + public boolean addCompactingRegion(HRegionInfo regionInfo) { + return compactingRegions.add(regionInfo); + } + + /** + * @param regionInfo for the region that should be unmarked as undergoing stats collection via + * major compaction. + * @return true if the region was marked for stats collection via compaction, false otherwise. + */ + public boolean removeCompactingRegion(HRegionInfo regionInfo) { + return compactingRegions.remove(regionInfo); + } + + /** + * @param regionInfo for the region to check for. + * @return true if stats are being collected for the region via major compaction, false + * otherwise. + */ + public boolean areStatsBeingCollectedOnCompaction(HRegionInfo regionInfo) { + return compactingRegions.contains(regionInfo); + } + + /** + * @param regionInfo for the region to run UPDATE STATISTICS command on. + * @return true if UPDATE STATISTICS wasn't already running on the region, false otherwise. + */ + public boolean addUpdateStatsCommandRegion(HRegionInfo regionInfo) { + return updateStatsRegions.add(regionInfo); + } + + /** + * @param regionInfo for the region to mark as not running UPDATE STATISTICS command on. + * @return true if UPDATE STATISTICS was running on the region, false otherwise. + */ + public boolean removeUpdateStatsCommandRegion(HRegionInfo regionInfo) { + return updateStatsRegions.remove(regionInfo); + } + + /** + * Enqueues the task for execution. + * @param <T> + * @param c task to execute + */ + public <T> Future<T> runTask(Callable<T> c) { + return executor.submit(c); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/08aa07f5/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java index 22fdf90..07fef1c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java @@ -216,14 +216,14 @@ public class StatisticsCollector { } } - public InternalScanner createCompactionScanner(HRegion region, Store store, InternalScanner s, + public InternalScanner createCompactionScanner(RegionCoprocessorEnvironment env, Store store, InternalScanner s, Pair<HRegionInfo, HRegionInfo> mergeRegions) throws IOException { // See if this is for Major compaction if (logger.isDebugEnabled()) { logger.debug("Compaction scanner created for stats"); } ImmutableBytesPtr cfKey = new ImmutableBytesPtr(store.getFamily().getName()); - return getInternalScanner(region, store, s, cfKey, mergeRegions); + return getInternalScanner(env, store, s, cfKey, mergeRegions); } public void splitStats(HRegion parent, HRegion left, HRegion right) { @@ -245,9 +245,9 @@ public class StatisticsCollector { } } - protected InternalScanner getInternalScanner(HRegion region, Store store, InternalScanner internalScan, + protected InternalScanner getInternalScanner(RegionCoprocessorEnvironment env, Store store, InternalScanner internalScan, ImmutableBytesPtr family, Pair<HRegionInfo, HRegionInfo> mergeRegions) { - return new StatisticsScanner(this, statsTable, region, internalScan, family, mergeRegions); + return new StatisticsScanner(this, statsTable, env, internalScan, family, mergeRegions); } public void clear() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/08aa07f5/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java index 5c460e3..550a772 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java @@ -17,21 +17,28 @@ */ package org.apache.phoenix.schema.stats; +import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_COMMIT_STATS_ASYNC; + import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; + /** * The scanner that does the scanning to collect the stats during major compaction.{@link StatisticsCollector} */ @@ -43,15 +50,18 @@ public class StatisticsScanner implements InternalScanner { private StatisticsCollector tracker; private ImmutableBytesPtr family; private Pair<HRegionInfo, HRegionInfo> mergeRegions; + private final Configuration config; - public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, HRegion region, + public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, RegionCoprocessorEnvironment env, InternalScanner delegate, ImmutableBytesPtr family, Pair<HRegionInfo, HRegionInfo> mergeRegions) { this.tracker = tracker; this.stats = stats; this.delegate = delegate; - this.region = region; + this.region = env.getRegion(); + this.config = env.getConfiguration(); this.family = family; this.mergeRegions = mergeRegions; + StatisticsCollectionRunTracker.getInstance(config).addCompactingRegion(region.getRegionInfo()); } @Override @@ -79,65 +89,82 @@ public class StatisticsScanner implements InternalScanner { tracker.collectStatistics(results); } } - - @Override - public void close() throws IOException { - IOException toThrow = null; - try { - // update the statistics table - // Just verify if this if fine - ArrayList<Mutation> mutations = new ArrayList<Mutation>(); - if (mergeRegions != null) { - if (mergeRegions.getFirst() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting stale stats for the region " - + mergeRegions.getFirst().getRegionNameAsString() + " as part of major compaction"); + + private class StatisticsScannerCallable implements Callable<Void> { + @Override + public Void call() throws IOException { + IOException toThrow = null; + StatisticsCollectionRunTracker statsRunState = + StatisticsCollectionRunTracker.getInstance(config); + try { + // update the statistics table + // Just verify if this if fine + ArrayList<Mutation> mutations = new ArrayList<Mutation>(); + if (mergeRegions != null) { + if (mergeRegions.getFirst() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting stale stats for the region " + + mergeRegions.getFirst().getRegionNameAsString() + " as part of major compaction"); + } + stats.deleteStats(mergeRegions.getFirst().getRegionName(), tracker, family, mutations); } - stats.deleteStats(mergeRegions.getFirst().getRegionName(), tracker, family, mutations); - } - if (mergeRegions.getSecond() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting stale stats for the region " - + mergeRegions.getSecond().getRegionNameAsString() + " as part of major compaction"); + if (mergeRegions.getSecond() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting stale stats for the region " + + mergeRegions.getSecond().getRegionNameAsString() + " as part of major compaction"); + } + stats.deleteStats(mergeRegions.getSecond().getRegionName(), tracker, family, mutations); } - stats.deleteStats(mergeRegions.getSecond().getRegionName(), tracker, family, mutations); } - } - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting the stats for the region " + region.getRegionNameAsString() + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting the stats for the region " + region.getRegionNameAsString() + " as part of major compaction"); - } - stats.deleteStats(region.getRegionName(), this.tracker, family, mutations); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding new stats for the region " + region.getRegionNameAsString() + } + stats.deleteStats(region.getRegionName(), tracker, family, mutations); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding new stats for the region " + region.getRegionNameAsString() + " as part of major compaction"); - } - stats.addStats(region.getRegionName(), this.tracker, family, mutations); - if (LOG.isDebugEnabled()) { - LOG.debug("Committing new stats for the region " + region.getRegionNameAsString() + } + stats.addStats(region.getRegionName(), tracker, family, mutations); + if (LOG.isDebugEnabled()) { + LOG.debug("Committing new stats for the region " + region.getRegionNameAsString() + " as part of major compaction"); - } - stats.commitStats(mutations); - } catch (IOException e) { - LOG.error("Failed to update statistics table!", e); - toThrow = e; - } finally { - try { - stats.close(); + } + stats.commitStats(mutations); } catch (IOException e) { - if (toThrow == null) toThrow = e; - LOG.error("Error while closing the stats table", e); + LOG.error("Failed to update statistics table!", e); + toThrow = e; } finally { - // close the delegate scanner try { - delegate.close(); + statsRunState.removeCompactingRegion(region.getRegionInfo()); + stats.close(); } catch (IOException e) { if (toThrow == null) toThrow = e; - LOG.error("Error while closing the scanner", e); + LOG.error("Error while closing the stats table", e); } finally { - if (toThrow != null) { throw toThrow; } + // close the delegate scanner + try { + delegate.close(); + } catch (IOException e) { + if (toThrow == null) toThrow = e; + LOG.error("Error while closing the scanner", e); + } finally { + if (toThrow != null) { throw toThrow; } + } } } + return null; + } + } + + @Override + public void close() throws IOException { + boolean async = config.getBoolean(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC); + StatisticsScannerCallable callable = new StatisticsScannerCallable(); + if (!async) { + callable.call(); + } else { + StatisticsCollectionRunTracker.getInstance(config).runTask(callable); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/08aa07f5/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java index 8756568..3f187bd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java @@ -24,11 +24,8 @@ import java.util.Collections; import java.util.List; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; http://git-wip-us.apache.org/repos/asf/phoenix/blob/08aa07f5/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java index 5289ab9..215110c 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java @@ -55,6 +55,8 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { public static final int DEFAULT_MIN_STATS_UPDATE_FREQ_MS = 0; public static final boolean DEFAULT_EXPLAIN_CHUNK_COUNT = false; // TODO: update explain plans in test and set to true public static final String DEFAULT_EXTRA_JDBC_ARGUMENTS = PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM; + private static final boolean DEFAULT_RUN_UPDATE_STATS_ASYNC = false; + private static final boolean DEFAULT_COMMIT_STATS_ASYNC = false; /** @@ -93,7 +95,9 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { .setMaxClientMetaDataCacheSize(DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE) .setMaxServerMetaDataCacheSize(DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE) .setForceRowKeyOrder(DEFAULT_FORCE_ROWKEY_ORDER) - .setExtraJDBCArguments(DEFAULT_EXTRA_JDBC_ARGUMENTS); + .setExtraJDBCArguments(DEFAULT_EXTRA_JDBC_ARGUMENTS) + .setRunUpdateStatsAsync(DEFAULT_RUN_UPDATE_STATS_ASYNC) + .setCommitStatsAsync(DEFAULT_COMMIT_STATS_ASYNC); } public QueryServicesTestImpl(ReadOnlyProps defaultProps, ReadOnlyProps overrideProps) {
