Repository: phoenix Updated Branches: refs/heads/4.0 e0996ade7 -> e49e8dcfb
PHOENIX-1320 Update stats atomically Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8c054b2a Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8c054b2a Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8c054b2a Branch: refs/heads/4.0 Commit: 8c054b2ae9f59c9430d75b0bf1b1c269714db395 Parents: e0996ad Author: James Taylor <jtay...@salesforce.com> Authored: Sat Oct 4 16:54:36 2014 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Sat Oct 4 16:54:36 2014 -0700 ---------------------------------------------------------------------- .../UngroupedAggregateRegionObserver.java | 58 ++++++++------- .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 3 +- .../query/ConnectionQueryServicesImpl.java | 44 ++++------- .../apache/phoenix/query/QueryConstants.java | 5 +- .../schema/stat/StatisticsCollector.java | 66 +++++++++-------- .../phoenix/schema/stat/StatisticsScanner.java | 34 ++++++--- .../phoenix/schema/stat/StatisticsTable.java | 78 ++++++++++++-------- .../phoenix/schema/stat/StatisticsUtils.java | 8 -- .../org/apache/phoenix/util/SchemaUtil.java | 5 ++ 9 files changed, 166 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c054b2a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 0bf2710..4ddb322 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -34,8 +34,6 @@ import java.util.Collections; import java.util.List; import java.util.Set; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; @@ -83,7 +81,6 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.stat.StatisticsCollector; -import org.apache.phoenix.schema.stat.StatisticsTable; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; @@ -116,8 +113,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ public static final String EMPTY_CF = "EmptyCF"; private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class); private KeyValueBuilder kvBuilder; - private static final Log LOG = LogFactory.getLog(UngroupedAggregateRegionObserver.class); - private StatisticsTable statsTable = null; @Override public void start(CoprocessorEnvironment e) throws IOException { @@ -125,8 +120,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ // Can't use ClientKeyValueBuilder on server-side because the memstore expects to // be able to get a single backing buffer for a KeyValue. this.kvBuilder = GenericKeyValueBuilder.INSTANCE; - String name = ((RegionCoprocessorEnvironment)e).getRegion().getTableDesc().getTableName().getNameAsString(); - this.statsTable = StatisticsTable.getStatisticsTableForCoprocessor(e.getConfiguration(), name); } private static void commitBatch(HRegion region, List<Mutation> mutations, byte[] indexUUID) throws IOException { @@ -161,12 +154,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ @Override protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException { int offset = 0; - boolean isAnalyze = false; HRegion region = c.getEnvironment().getRegion(); StatisticsCollector stats = null; - if(ScanUtil.isAnalyzeTable(scan) && statsTable != null) { - stats = new StatisticsCollector(statsTable, c.getEnvironment().getConfiguration()); - isAnalyze = true; + if(ScanUtil.isAnalyzeTable(scan)) { + // Let this throw, as this scan is being done for the sole purpose of collecting stats + stats = new StatisticsCollector(c.getEnvironment(), region.getRegionInfo().getTable().getNameAsString()); } if (ScanUtil.isLocalIndex(scan)) { /* @@ -260,7 +252,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ // since this is an indication of whether or not there are more values after the // ones returned hasMore = innerScanner.nextRaw(results); - if(isAnalyze && stats != null) { + if(stats != null) { stats.collectStatistics(results); } @@ -383,13 +375,19 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } while (hasMore); } finally { try { - if (isAnalyze && stats != null) { - stats.updateStatistic(region); - stats.clear(); + if (stats != null) { + try { + stats.updateStatistic(region); + } finally { + stats.close(); + } } - innerScanner.close(); } finally { - region.closeRegionOperation(); + try { + innerScanner.close(); + } finally { + region.closeRegionOperation(); + } } } @@ -458,9 +456,18 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME) && scanType.equals(ScanType.COMPACT_DROP_DELETES)) { - StatisticsCollector stats = new StatisticsCollector(statsTable, c.getEnvironment().getConfiguration()); - internalScan = - stats.createCompactionScanner(c.getEnvironment().getRegion(), store, scanners, scanType, earliestPutTs, s); + try { + // TODO: when does this get closed? + StatisticsCollector stats = new StatisticsCollector(c.getEnvironment(), table.getNameAsString()); + internalScan = + stats.createCompactionScanner(c.getEnvironment().getRegion(), store, scanners, scanType, earliestPutTs, s); + } catch (IOException e) { + // If we can't reach the stats table, don't interrupt the normal + // compaction operation, just log a warning. + if(logger.isWarnEnabled()) { + logger.warn("Unable to collect stats for " + table, e); + } + } } return internalScan; } @@ -472,15 +479,16 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ HRegion region = e.getEnvironment().getRegion(); TableName table = region.getRegionInfo().getTable(); if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) { + StatisticsCollector stats = null; try { - StatisticsCollector stats = new StatisticsCollector(statsTable, e.getEnvironment() - .getConfiguration()); + stats = new StatisticsCollector(e.getEnvironment(), table.getNameAsString()); stats.collectStatsDuringSplit(e.getEnvironment().getConfiguration(), l, r, region); - stats.clear(); } catch (IOException ioe) { - if(LOG.isDebugEnabled()) { - LOG.debug("Error while collecting stats during split ",ioe); + if(logger.isWarnEnabled()) { + logger.warn("Error while collecting stats during split for " + table,ioe); } + } finally { + if (stats != null) stats.close(); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c054b2a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index a800fd9..7afeb6e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -107,9 +107,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho public static final String SYSTEM_CATALOG_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_CATALOG_TABLE); public static final byte[] SYSTEM_CATALOG_NAME_BYTES = SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_TABLE_BYTES, SYSTEM_CATALOG_SCHEMA_BYTES); public static final String SYSTEM_STATS_TABLE = "STATS"; - public static final byte[] SYSTEM_STATS_BYTES = Bytes.toBytes(SYSTEM_STATS_TABLE); public static final String SYSTEM_STATS_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_STATS_TABLE); - public static final byte[] SYSTEM_STATS_NAME_BYTES = SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_TABLE_BYTES, SYSTEM_STATS_BYTES); + public static final byte[] SYSTEM_STATS_NAME_BYTES = Bytes.toBytes(SYSTEM_STATS_NAME); public static final String SYSTEM_CATALOG_ALIAS = "\"SYSTEM.TABLE\""; http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c054b2a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index c8fd90b..fd60eb8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; @@ -103,7 +104,6 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.protobuf.ProtobufUtil; import org.apache.phoenix.schema.EmptySequenceCacheException; -import org.apache.phoenix.schema.MetaDataSplitPolicy; import org.apache.phoenix.schema.NewerTableAlreadyExistsException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; @@ -590,13 +590,21 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, 1, null); } // TODO: better encapsulation for this - // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. Also, - // don't install on the metadata table until we fix the TODO there. - if ((tableType != PTableType.INDEX && tableType != PTableType.VIEW) && !SchemaUtil.isMetaTable(tableName) && !descriptor.hasCoprocessor(Indexer.class.getName())) { + // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. + // Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use + // all-or-none mutate class which break when this coprocessor is installed (PHOENIX-1318). + if ((tableType != PTableType.INDEX && tableType != PTableType.VIEW) + && !SchemaUtil.isMetaTable(tableName) + && !SchemaUtil.isStatsTable(tableName) + && !descriptor.hasCoprocessor(Indexer.class.getName())) { Map<String, String> opts = Maps.newHashMapWithExpectedSize(1); opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts); } + if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) { + descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(), + null, 1, null); + } if (descriptor.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) != null && Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(descriptor @@ -730,12 +738,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement HTableDescriptor newDesc = generateTableDescriptor(tableName, existingDesc, tableType , props, families, splits); if (!tableExist) { - /* - * Remove the splitPolicy attribute due to an HBase bug (see below) - */ - if (isMetaTable) { - newDesc.remove(HTableDescriptor.SPLIT_POLICY); - } if (newDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) != null && Boolean.TRUE.equals(PDataType.BOOLEAN.toObject(newDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) { newDesc.setValue(HTableDescriptor.SPLIT_POLICY, IndexRegionSplitPolicy.class.getName()); } @@ -752,31 +754,17 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } if (isMetaTable) { checkClientServerCompatibility(); - /* - * Now we modify the table to add the split policy, since we know that the client and - * server and compatible. This works around a nasty, known HBase bug where if a split - * policy class cannot be found on the server, the HBase table is left in a horrible - * "ghost" state where it can't be used and can't be deleted without bouncing the master. - */ - newDesc.setValue(HTableDescriptor.SPLIT_POLICY, MetaDataSplitPolicy.class.getName()); - admin.disableTable(tableName); - admin.modifyTable(tableName, newDesc); - admin.enableTable(tableName); } return null; } else { - if (!modifyExistingMetaData || existingDesc.equals(newDesc)) { - // Table is already created. Note that the presplits are ignored in this case - if (isMetaTable) { - checkClientServerCompatibility(); - } - return existingDesc; - } - if (isMetaTable) { checkClientServerCompatibility(); } + if (!modifyExistingMetaData || existingDesc.equals(newDesc)) { + return existingDesc; + } + // TODO: Take advantage of online schema change ability by setting "hbase.online.schema.update.enable" to true admin.disableTable(tableName); admin.modifyTable(tableName, newDesc); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c054b2a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index bbc653e..84bb516 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -226,6 +226,7 @@ public interface QueryConstants { "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + "," + TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_NAME + "," + COLUMN_FAMILY + "))\n" + HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" + + // Install split policy to prevent a tenant's metadata from being split across regions. HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n"; public static final String CREATE_STATS_TABLE_METADATA = @@ -241,7 +242,9 @@ public interface QueryConstants { "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + PHYSICAL_NAME + "," + COLUMN_FAMILY + ","+ REGION_NAME+"))\n" + - HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" + + // TODO: should we support versioned stats? + // HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" + + // Install split policy to prevent a physical table's stats from being split across regions. HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n"; public static final String CREATE_SEQUENCE_METADATA = http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c054b2a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java index 82ae309..bb05a32 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java @@ -28,8 +28,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; @@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PDataType; @@ -51,7 +54,11 @@ import com.google.common.collect.Maps; /** * A default implementation of the Statistics tracker that helps to collect stats like min key, max key and - * guideposts + * guideposts. + * TODO: review timestamps used for stats. We support the user controlling the timestamps, so we should + * honor that with timestamps for stats as well. The issue is for compaction, though. I don't know of + * a way for the user to specify any timestamp for that. Perhaps best to use current time across the + * board for now. */ public class StatisticsCollector { @@ -64,13 +71,19 @@ public class StatisticsCollector { // Ensures that either analyze or compaction happens at any point of time. private static final Log LOG = LogFactory.getLog(StatisticsCollector.class); - public StatisticsCollector(StatisticsTable statsTable, Configuration conf) throws IOException { + public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName) throws IOException { + guidepostDepth = + env.getConfiguration().getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, + QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH); // Get the stats table associated with the current table on which the CP is // triggered - this.statsTable = statsTable; - guidepostDepth = - conf.getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, - QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH); + this.statsTable = StatisticsTable.getStatisticsTable( + env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES))); + this.statsTable.commitLastStatsUpdatedTime(tableName, TimeKeeper.SYSTEM.getCurrentTime()); + } + + public void close() throws IOException { + this.statsTable.close(); } public void updateStatistic(HRegion region) { @@ -119,10 +132,11 @@ public class StatisticsCollector { private void deleteStatsFromStatsTable(final HRegion region, List<Mutation> mutations, long currentTime) throws IOException { try { + String tableName = region.getRegionInfo().getTable().getNameAsString(); + String regionName = region.getRegionInfo().getRegionNameAsString(); // update the statistics table for (ImmutableBytesPtr fam : familyMap.keySet()) { - String tableName = region.getRegionInfo().getTable().getNameAsString(); - statsTable.deleteStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this, + statsTable.deleteStats(tableName, regionName, this, Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime); } } catch (IOException e) { @@ -147,7 +161,7 @@ public class StatisticsCollector { } /** - * Update the current statistics based on the lastest batch of key-values from the underlying scanner + * Update the current statistics based on the latest batch of key-values from the underlying scanner * * @param results * next batch of {@link KeyValue}s @@ -188,24 +202,19 @@ public class StatisticsCollector { public void collectStatsDuringSplit(Configuration conf, HRegion l, HRegion r, HRegion region) { try { - if (familyMap != null) { - familyMap.clear(); - } // Create a delete operation on the parent region // Then write the new guide posts for individual regions - // TODO : Try making this atomic List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3); long currentTime = TimeKeeper.SYSTEM.getCurrentTime(); + deleteStatsFromStatsTable(region, mutations, currentTime); if (LOG.isDebugEnabled()) { LOG.debug("Collecting stats for the daughter region " + l.getRegionInfo()); } - collectStatsForSplitRegions(conf, l, region, true, mutations, currentTime); - clear(); + collectStatsForSplitRegions(conf, l, mutations, currentTime); if (LOG.isDebugEnabled()) { LOG.debug("Collecting stats for the daughter region " + r.getRegionInfo()); } - collectStatsForSplitRegions(conf, r, region, false, mutations, currentTime); - clear(); + collectStatsForSplitRegions(conf, r, mutations, currentTime); if (LOG.isDebugEnabled()) { LOG.debug("Committing stats for the daughter regions as part of split " + r.getRegionInfo()); } @@ -216,32 +225,29 @@ public class StatisticsCollector { } } - private void collectStatsForSplitRegions(Configuration conf, HRegion daughter, HRegion parent, boolean delete, + private void collectStatsForSplitRegions(Configuration conf, HRegion daughter, List<Mutation> mutations, long currentTime) throws IOException { + IOException toThrow = null; + clear(); Scan scan = createScan(conf); RegionScanner scanner = null; int count = 0; try { scanner = daughter.getScanner(scan); count = scanRegion(scanner, count); + writeStatsToStatsTable(daughter, false, mutations, currentTime); } catch (IOException e) { LOG.error(e); - throw e; + toThrow = e; } finally { - if (scanner != null) { try { - if (delete) { - if (LOG.isDebugEnabled()) { - LOG.debug("Deleting the stats for the parent region " + parent.getRegionInfo()); - } - deleteStatsFromStatsTable(parent, mutations, currentTime); - } - writeStatsToStatsTable(daughter, false, mutations, currentTime); + if (scanner != null) scanner.close(); } catch (IOException e) { LOG.error(e); - throw e; + if (toThrow != null) toThrow = e; + } finally { + if (toThrow != null) throw toThrow; } - } } } @@ -256,7 +262,7 @@ public class StatisticsCollector { protected InternalScanner getInternalScanner(HRegion region, Store store, InternalScanner internalScan, String family) { - return new StatisticsScanner(this, statsTable, region.getRegionInfo(), internalScan, + return new StatisticsScanner(this, statsTable, region, internalScan, Bytes.toBytes(family)); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c054b2a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java index 86ffca7..79f64fc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java @@ -15,10 +15,10 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.util.SchemaUtil; @@ -31,11 +31,11 @@ public class StatisticsScanner implements InternalScanner { private static final Log LOG = LogFactory.getLog(StatisticsScanner.class); private InternalScanner delegate; private StatisticsTable stats; - private HRegionInfo region; + private HRegion region; private StatisticsCollector tracker; private byte[] family; - public StatisticsScanner(StatisticsCollector tracker, StatisticsTable stats, HRegionInfo region, + public StatisticsScanner(StatisticsCollector tracker, StatisticsTable stats, HRegion region, InternalScanner delegate, byte[] family) { // should there be only one tracker? this.tracker = tracker; @@ -75,12 +75,13 @@ public class StatisticsScanner implements InternalScanner { } } + @Override public void close() throws IOException { IOException toThrow = null; try { // update the statistics table // Just verify if this if fine - String tableName = SchemaUtil.getTableNameFromFullName(region.getTable().getNameAsString()); + String tableName = SchemaUtil.getTableNameFromFullName(region.getRegionInfo().getTable().getNameAsString()); ArrayList<Mutation> mutations = new ArrayList<Mutation>(); long currentTime = TimeKeeper.SYSTEM.getCurrentTime(); if (LOG.isDebugEnabled()) { @@ -103,12 +104,25 @@ public class StatisticsScanner implements InternalScanner { } catch (IOException e) { LOG.error("Failed to update statistics table!", e); toThrow = e; - } - // close the delegate scanner - try { - delegate.close(); - } catch (IOException e) { - LOG.error("Error while closing the scanner"); + } finally { + try { + stats.close(); + } catch (IOException e) { + if (toThrow == null) toThrow = e; + LOG.error("Error while closing the stats table", e); + } finally { + // close the delegate scanner + try { + delegate.close(); + } catch (IOException e) { + if (toThrow == null) toThrow = e; + LOG.error("Error while closing the scanner", e); + } finally { + if (toThrow != null) { + throw toThrow; + } + } + } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c054b2a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java index bc769e3..ebaa978 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java @@ -20,28 +20,30 @@ package org.apache.phoenix.schema.stat; import java.io.Closeable; import java.io.IOException; import java.sql.Date; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; +import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.TrustedByteArrayOutputStream; + +import com.google.protobuf.ServiceException; /** * Wrapper to access the statistics table SYSTEM.STATS using the HTable. */ -@SuppressWarnings("deprecation") public class StatisticsTable implements Closeable { - /** Map of the currently open statistics tables */ - private static final Map<String, StatisticsTable> tableMap = new HashMap<String, StatisticsTable>(); /** * @param Configuration * Configruation to update the stats table. @@ -51,20 +53,8 @@ public class StatisticsTable implements Closeable { * @throws IOException * if the table cannot be created due to an underlying HTable creation error */ - public synchronized static StatisticsTable getStatisticsTableForCoprocessor(Configuration conf, - String primaryTableName) throws IOException { - StatisticsTable table = tableMap.get(primaryTableName); - if (table == null) { - // Map the statics table and the table with which the statistics is - // associated. This is a workaround - HTablePool pool = new HTablePool(conf,100); - //HTable hTable = new HTable(conf, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME); - HTableInterface hTable = pool.getTable(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME); - //h.setAutoFlushTo(true); - table = new StatisticsTable(hTable); - tableMap.put(primaryTableName, table); - } - return table; + public static StatisticsTable getStatisticsTable(HTableInterface hTable) throws IOException { + return new StatisticsTable(hTable); } private final HTableInterface statisticsTable; @@ -102,21 +92,39 @@ public class StatisticsTable implements Closeable { if (tracker == null) { return; } // Add the timestamp header - formLastUpdatedStatsMutation(tableName, currentTime, mutations); + commitLastStatsUpdatedTime(tableName, currentTime); byte[] prefix = StatisticsUtils.getRowKey(PDataType.VARCHAR.toBytes(tableName), PDataType.VARCHAR.toBytes(fam), PDataType.VARCHAR.toBytes(regionName)); formStatsUpdateMutation(tracker, fam, mutations, currentTime, prefix); } + private static MutationType getMutationType(Mutation m) throws IOException { + if (m instanceof Put) { + return MutationType.PUT; + } else if (m instanceof Delete) { + return MutationType.DELETE; + } else { + throw new DoNotRetryIOException("Unsupported mutation type in stats commit" + + m.getClass().getName()); + } + } public void commitStats(List<Mutation> mutations) throws IOException { - Object[] res = new Object[mutations.size()]; - try { - if (mutations.size() > 0) { - statisticsTable.batch(mutations, res); + if (mutations.size() > 0) { + byte[] row = mutations.get(0).getRow(); + MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder(); + for (Mutation m : mutations) { + mrmBuilder.addMutationRequest(ProtobufUtil.toMutation(getMutationType(m), m)); + } + MutateRowsRequest mrm = mrmBuilder.build(); + CoprocessorRpcChannel channel = statisticsTable.coprocessorService(row); + MultiRowMutationService.BlockingInterface service = + MultiRowMutationService.newBlockingStub(channel); + try { + service.mutateRows(null, mrm); + } catch (ServiceException ex) { + ProtobufUtil.toIOException(ex); } - } catch (InterruptedException e) { - throw new IOException("Exception while adding deletes and puts"); } } @@ -137,12 +145,20 @@ public class StatisticsTable implements Closeable { mutations.add(put); } - private void formLastUpdatedStatsMutation(String tableName, long currentTime, List<Mutation> mutations) throws IOException { - byte[] prefix = StatisticsUtils.getRowKeyForTSUpdate(PDataType.VARCHAR.toBytes(tableName)); + public static byte[] getRowKeyForTSUpdate(byte[] table) throws IOException { + // always starts with the source table + TrustedByteArrayOutputStream os = new TrustedByteArrayOutputStream(table.length); + os.write(table); + os.close(); + return os.getBuffer(); + } + + public void commitLastStatsUpdatedTime(String tableName, long currentTime) throws IOException { + byte[] prefix = PDataType.VARCHAR.toBytes(tableName); Put put = new Put(prefix); put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME_BYTES, currentTime, PDataType.DATE.toBytes(new Date(currentTime))); - mutations.add(put); + statisticsTable.put(put); } public void deleteStats(String tableName, String regionName, StatisticsCollector tracker, String fam, http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c054b2a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java index 7cb3a38..8b6d7fc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java @@ -48,14 +48,6 @@ public class StatisticsUtils { return os.getBuffer(); } - public static byte[] getRowKeyForTSUpdate(byte[] table) throws IOException { - // always starts with the source table - TrustedByteArrayOutputStream os = new TrustedByteArrayOutputStream(table.length); - os.write(table); - os.close(); - return os.getBuffer(); - } - public static byte[] getCFFromRowKey(byte[] table, byte[] row, int rowOffset, int rowLength) { // Move over the the sepeartor byte that would be written after the table name int startOff = Bytes.indexOf(row, table) + (table.length) + 1; http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c054b2a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java index c0ee92b..5cc861b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java @@ -18,6 +18,7 @@ package org.apache.phoenix.util; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES; import java.sql.SQLException; import java.sql.Statement; @@ -344,6 +345,10 @@ public class SchemaUtil { return Bytes.compareTo(tableName, SYSTEM_CATALOG_NAME_BYTES) == 0; } + public static boolean isStatsTable(byte[] tableName) { + return Bytes.compareTo(tableName, SYSTEM_STATS_NAME_BYTES) == 0; + } + public static boolean isSequenceTable(byte[] tableName) { return Bytes.compareTo(tableName, PhoenixDatabaseMetaData.SEQUENCE_TABLE_NAME_BYTES) == 0; }