Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 972ceb5c3 -> 508751847
LP-2692 Config setting for disabling stats Add configuration setting to allow disabling stats collection, for environments where it is not desired or is causing issues. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/50875184 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/50875184 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/50875184 Branch: refs/heads/4.x-HBase-0.98 Commit: 5087518478be465a7d10eb9d7f7981dd4d49eed2 Parents: 972ceb5 Author: Gabriel Reid <gabri...@ngdata.com> Authored: Fri Feb 19 14:37:27 2016 +0100 Committer: Gabriel Reid <gabri...@ngdata.com> Committed: Fri Feb 19 14:44:45 2016 +0100 ---------------------------------------------------------------------- .../end2end/StatsCollectionDisabledIT.java | 70 ++++++ .../UngroupedAggregateRegionObserver.java | 12 +- .../org/apache/phoenix/query/QueryServices.java | 1 + .../stats/DefaultStatisticsCollector.java | 221 +++++++++++++++++++ .../schema/stats/NoOpStatisticsCollector.java | 71 ++++++ .../schema/stats/StatisticsCollector.java | 212 +++--------------- .../stats/StatisticsCollectorFactory.java | 63 ++++++ .../phoenix/schema/stats/StatisticsScanner.java | 2 +- 8 files changed, 463 insertions(+), 189 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/50875184/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectionDisabledIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectionDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectionDisabledIT.java new file mode 100644 index 0000000..a92a665 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectionDisabledIT.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Map; +import java.util.Properties; + +import com.google.common.collect.Maps; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertFalse; + +/** + * Verifies that statistics are not collected if they are disabled via a setting + */ +public class StatsCollectionDisabledIT extends StatsCollectorAbstractIT { + + @BeforeClass + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + // Must update config before starting server + props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20)); + props.put(QueryServices.STATS_ENABLED_ATTRIB, Boolean.toString(false)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testStatisticsAreNotWritten() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE T1 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR)"); + stmt.execute("UPSERT INTO T1 VALUES (1, 'NAME1')"); + stmt.execute("UPSERT INTO T1 VALUES (2, 'NAME2')"); + stmt.execute("UPSERT INTO T1 VALUES (3, 'NAME3')"); + conn.commit(); + stmt.execute("UPDATE STATISTICS T1"); + ResultSet rs = stmt.executeQuery("SELECT * FROM SYSTEM.STATS"); + assertFalse(rs.next()); + rs.close(); + stmt.close(); + conn.close(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/50875184/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index c8784f6..9540b00 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -89,6 +89,7 @@ import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker; import org.apache.phoenix.schema.stats.StatisticsCollector; +import org.apache.phoenix.schema.stats.StatisticsCollectorFactory; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; import org.apache.phoenix.schema.types.PBinary; import org.apache.phoenix.schema.types.PChar; @@ -184,9 +185,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver byte[] gp_per_region_bytes = scan.getAttribute(BaseScannerRegionObserver.GUIDEPOST_PER_REGION); // Let this throw, as this scan is being done for the sole purpose of collecting stats - StatisticsCollector statsCollector = - new StatisticsCollector(env, region.getRegionInfo().getTable() - .getNameAsString(), ts, gp_width_bytes, gp_per_region_bytes); + StatisticsCollector statsCollector = StatisticsCollectorFactory.createStatisticsCollector( + env, region.getRegionInfo().getTable().getNameAsString(), ts, + gp_width_bytes, gp_per_region_bytes); return collectStats(s, statsCollector, region, scan, env.getConfiguration()); } int offsetToBe = 0; @@ -610,8 +611,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver try { Pair<HRegionInfo, HRegionInfo> mergeRegions = null; long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime(); - StatisticsCollector stats = new StatisticsCollector(c.getEnvironment(), table.getNameAsString(), - clientTimeStamp, store.getFamily().getName()); + StatisticsCollector stats = StatisticsCollectorFactory.createStatisticsCollector( + c.getEnvironment(), table.getNameAsString(), clientTimeStamp, + store.getFamily().getName()); internalScanner = stats.createCompactionScanner(c.getEnvironment(), store, scanner); } catch (IOException e) { // If we can't reach the stats table, don't interrupt the normal http://git-wip-us.apache.org/repos/asf/phoenix/blob/50875184/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 1fb2eca..de9b597 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -152,6 +152,7 @@ public interface QueryServices extends SQLCloseable { public static final String STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB = "phoenix.stats.guidepost.width"; public static final String STATS_GUIDEPOST_PER_REGION_ATTRIB = "phoenix.stats.guidepost.per.region"; public static final String STATS_USE_CURRENT_TIME_ATTRIB = "phoenix.stats.useCurrentTime"; + public static final String STATS_ENABLED_ATTRIB = "phoenix.stats.enabled"; public static final String RUN_UPDATE_STATS_ASYNC = "phoenix.update.stats.command.async"; public static final String STATS_SERVER_POOL_SIZE = "phoenix.stats.pool.size"; public static final String COMMIT_STATS_ASYNC = "phoenix.stats.commit.async"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/50875184/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java new file mode 100644 index 0000000..e7ba443 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stats; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.util.TimeKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +/** + * A default implementation of the Statistics tracker that helps to collect stats like min key, max key and guideposts. + * TODO: review timestamps used for stats. We support the user controlling the timestamps, so we should honor that with + * timestamps for stats as well. The issue is for compaction, though. I don't know of a way for the user to specify any + * timestamp for that. Perhaps best to use current time across the board for now. + */ +public class DefaultStatisticsCollector implements StatisticsCollector { + private static final Logger logger = LoggerFactory.getLogger(DefaultStatisticsCollector.class); + + private long guidepostDepth; + private long maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP; + private Map<ImmutableBytesPtr, Pair<Long, GuidePostsInfoBuilder>> guidePostsInfoWriterMap = Maps.newHashMap(); + protected StatisticsWriter statsTable; + private Pair<Long, GuidePostsInfoBuilder> cachedGps = null; + + DefaultStatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp, byte[] family, + byte[] gp_width_bytes, byte[] gp_per_region_bytes) throws IOException { + Configuration config = env.getConfiguration(); + int guidepostPerRegion = gp_per_region_bytes == null + ? config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION) + : PInteger.INSTANCE.getCodec().decodeInt(gp_per_region_bytes, 0, SortOrder.getDefault()); + long guidepostWidth = gp_width_bytes == null + ? config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES) + : PLong.INSTANCE.getCodec().decodeInt(gp_width_bytes, 0, SortOrder.getDefault()); + this.guidepostDepth = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, + env.getRegion().getTableDesc()); + // Provides a means of clients controlling their timestamps to not use current time + // when background tasks are updating stats. Instead we track the max timestamp of + // the cells and use that. + boolean useCurrentTime = env.getConfiguration().getBoolean( + QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME); + if (!useCurrentTime) { + clientTimeStamp = StatisticsCollector.NO_TIMESTAMP; + } + // Get the stats table associated with the current table on which the CP is + // triggered + this.statsTable = StatisticsWriter.newWriter(env, tableName, clientTimeStamp); + // in a compaction we know the one family ahead of time + if (family != null) { + ImmutableBytesPtr cfKey = new ImmutableBytesPtr(family); + cachedGps = new Pair<Long, GuidePostsInfoBuilder>(0l, new GuidePostsInfoBuilder()); + guidePostsInfoWriterMap.put(cfKey, cachedGps); + } + } + + @Override + public long getMaxTimeStamp() { + return maxTimeStamp; + } + + @Override + public void close() throws IOException { + this.statsTable.close(); + } + + @Override + public void updateStatistic(HRegion region) { + try { + ArrayList<Mutation> mutations = new ArrayList<Mutation>(); + writeStatsToStatsTable(region, true, mutations, TimeKeeper.SYSTEM.getCurrentTime()); + if (logger.isDebugEnabled()) { + logger.debug("Committing new stats for the region " + region.getRegionInfo()); + } + commitStats(mutations); + } catch (IOException e) { + logger.error("Unable to commit new stats", e); + } finally { + clear(); + } + } + + private void writeStatsToStatsTable(final HRegion region, boolean delete, List<Mutation> mutations, long currentTime) + throws IOException { + try { + // update the statistics table + for (ImmutableBytesPtr fam : guidePostsInfoWriterMap.keySet()) { + if (delete) { + if (logger.isDebugEnabled()) { + logger.debug("Deleting the stats for the region " + region.getRegionInfo()); + } + statsTable.deleteStats(region, this, fam, mutations); + } + if (logger.isDebugEnabled()) { + logger.debug("Adding new stats for the region " + region.getRegionInfo()); + } + statsTable.addStats(this, fam, mutations); + } + } catch (IOException e) { + logger.error("Failed to update statistics table!", e); + throw e; + } + } + + private void commitStats(List<Mutation> mutations) throws IOException { + statsTable.commitStats(mutations); + } + + /** + * Update the current statistics based on the latest batch of key-values from the underlying scanner + * + * @param results + * next batch of {@link KeyValue}s + */ + @Override + public void collectStatistics(final List<Cell> results) { + Map<ImmutableBytesPtr, Boolean> famMap = Maps.newHashMap(); + for (Cell cell : results) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + maxTimeStamp = Math.max(maxTimeStamp, kv.getTimestamp()); + Pair<Long, GuidePostsInfoBuilder> gps; + if (cachedGps == null) { + ImmutableBytesPtr cfKey = new ImmutableBytesPtr(kv.getFamilyArray(), kv.getFamilyOffset(), + kv.getFamilyLength()); + gps = guidePostsInfoWriterMap.get(cfKey); + if (gps == null) { + gps = new Pair<Long, GuidePostsInfoBuilder>(0l, + new GuidePostsInfoBuilder()); + guidePostsInfoWriterMap.put(cfKey, gps); + } + if (famMap.get(cfKey) == null) { + famMap.put(cfKey, true); + gps.getSecond().incrementRowCount(); + } + } else { + gps = cachedGps; + cachedGps.getSecond().incrementRowCount(); + } + int kvLength = kv.getLength(); + long byteCount = gps.getFirst() + kvLength; + gps.setFirst(byteCount); + if (byteCount >= guidepostDepth) { + ImmutableBytesWritable row = new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()); + if (gps.getSecond().addGuidePosts(row, byteCount, gps.getSecond().getRowCount())) { + gps.setFirst(0l); + gps.getSecond().resetRowCount(); + } + } + } + } + + @Override + public InternalScanner createCompactionScanner(RegionCoprocessorEnvironment env, Store store, + InternalScanner s) throws IOException { + // See if this is for Major compaction + if (logger.isDebugEnabled()) { + logger.debug("Compaction scanner created for stats"); + } + ImmutableBytesPtr cfKey = new ImmutableBytesPtr(store.getFamily().getName()); + return getInternalScanner(env, store, s, cfKey); + } + + protected InternalScanner getInternalScanner(RegionCoprocessorEnvironment env, Store store, + InternalScanner internalScan, ImmutableBytesPtr family) { + return new StatisticsScanner(this, statsTable, env, internalScan, family); + } + + @Override + public void clear() { + this.guidePostsInfoWriterMap.clear(); + maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP; + } + + @Override + public GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam) { + Pair<Long, GuidePostsInfoBuilder> pair = guidePostsInfoWriterMap.get(fam); + if (pair != null) { return pair.getSecond().build(); } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/50875184/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java new file mode 100644 index 0000000..4484924 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stats; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; + +/** + * A drop-in statistics collector that does nothing. An instance of this class is used for tables + * or environments where statistics collection is disabled. + */ +public class NoOpStatisticsCollector implements StatisticsCollector { + + @Override + public long getMaxTimeStamp() { + return NO_TIMESTAMP; + } + + @Override + public void close() throws IOException { + // No-op + } + + @Override + public void updateStatistic(HRegion region) { + // No-op + } + + @Override + public void collectStatistics(List<Cell> results) { + // No-op + } + + @Override + public InternalScanner createCompactionScanner(RegionCoprocessorEnvironment env, Store store, + InternalScanner delegate) throws IOException { + return delegate; + } + + @Override + public void clear() { + // No-op + } + + @Override + public GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/50875184/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java index 5ef3bd0..cdd4f89 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java @@ -17,208 +17,54 @@ */ package org.apache.phoenix.schema.stats; +import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.SortOrder; -import org.apache.phoenix.schema.types.PInteger; -import org.apache.phoenix.schema.types.PLong; -import org.apache.phoenix.util.TimeKeeper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Maps; /** - * A default implementation of the Statistics tracker that helps to collect stats like min key, max key and guideposts. - * TODO: review timestamps used for stats. We support the user controlling the timestamps, so we should honor that with - * timestamps for stats as well. The issue is for compaction, though. I don't know of a way for the user to specify any - * timestamp for that. Perhaps best to use current time across the board for now. + * Statistics tracker that helps to collect stats like min key, max key and guideposts. */ -public class StatisticsCollector { - private static final Logger logger = LoggerFactory.getLogger(StatisticsCollector.class); - public static final long NO_TIMESTAMP = -1; - - private long guidepostDepth; - private long maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP; - private Map<ImmutableBytesPtr, Pair<Long, GuidePostsInfoBuilder>> guidePostsInfoWriterMap = Maps.newHashMap(); - protected StatisticsWriter statsTable; - private Pair<Long, GuidePostsInfoBuilder> cachedGps = null; - - public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp, - byte[] gp_width_bytes, byte[] gp_per_region_bytes) throws IOException { - this(env, tableName, clientTimeStamp, null, gp_width_bytes, gp_per_region_bytes); - } - - public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp, byte[] family) - throws IOException { - this(env, tableName, clientTimeStamp, family, null, null); - } - - private StatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp, byte[] family, - byte[] gp_width_bytes, byte[] gp_per_region_bytes) throws IOException { - Configuration config = env.getConfiguration(); - int guidepostPerRegion = gp_per_region_bytes == null - ? config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, - QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION) - : PInteger.INSTANCE.getCodec().decodeInt(gp_per_region_bytes, 0, SortOrder.getDefault()); - long guidepostWidth = gp_width_bytes == null - ? config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, - QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES) - : PLong.INSTANCE.getCodec().decodeInt(gp_width_bytes, 0, SortOrder.getDefault()); - this.guidepostDepth = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, - env.getRegion().getTableDesc()); - // Provides a means of clients controlling their timestamps to not use current time - // when background tasks are updating stats. Instead we track the max timestamp of - // the cells and use that. - boolean useCurrentTime = env.getConfiguration().getBoolean( - QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, - QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME); - if (!useCurrentTime) { - clientTimeStamp = StatisticsCollector.NO_TIMESTAMP; - } - // Get the stats table associated with the current table on which the CP is - // triggered - this.statsTable = StatisticsWriter.newWriter(env, tableName, clientTimeStamp); - // in a compaction we know the one family ahead of time - if (family != null) { - ImmutableBytesPtr cfKey = new ImmutableBytesPtr(family); - cachedGps = new Pair<Long, GuidePostsInfoBuilder>(0l, new GuidePostsInfoBuilder()); - guidePostsInfoWriterMap.put(cfKey, cachedGps); - } - } +public interface StatisticsCollector extends Closeable { - public long getMaxTimeStamp() { - return maxTimeStamp; - } - - public void close() throws IOException { - this.statsTable.close(); - } - - public void updateStatistic(HRegion region) { - try { - ArrayList<Mutation> mutations = new ArrayList<Mutation>(); - writeStatsToStatsTable(region, true, mutations, TimeKeeper.SYSTEM.getCurrentTime()); - if (logger.isDebugEnabled()) { - logger.debug("Committing new stats for the region " + region.getRegionInfo()); - } - commitStats(mutations); - } catch (IOException e) { - logger.error("Unable to commit new stats", e); - } finally { - clear(); - } - } - - private void writeStatsToStatsTable(final HRegion region, boolean delete, List<Mutation> mutations, long currentTime) - throws IOException { - try { - // update the statistics table - for (ImmutableBytesPtr fam : guidePostsInfoWriterMap.keySet()) { - if (delete) { - if (logger.isDebugEnabled()) { - logger.debug("Deleting the stats for the region " + region.getRegionInfo()); - } - statsTable.deleteStats(region, this, fam, mutations); - } - if (logger.isDebugEnabled()) { - logger.debug("Adding new stats for the region " + region.getRegionInfo()); - } - statsTable.addStats(this, fam, mutations); - } - } catch (IOException e) { - logger.error("Failed to update statistics table!", e); - throw e; - } - } - - private void commitStats(List<Mutation> mutations) throws IOException { - statsTable.commitStats(mutations); - } + /** Constant used if no max timestamp is available */ + long NO_TIMESTAMP = -1; /** - * Update the current statistics based on the latest batch of key-values from the underlying scanner - * - * @param results - * next batch of {@link KeyValue}s + * Returns the maximum timestamp of all cells encountered while collecting statistics. */ - public void collectStatistics(final List<Cell> results) { - Map<ImmutableBytesPtr, Boolean> famMap = Maps.newHashMap(); - for (Cell cell : results) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - maxTimeStamp = Math.max(maxTimeStamp, kv.getTimestamp()); - Pair<Long, GuidePostsInfoBuilder> gps; - if (cachedGps == null) { - ImmutableBytesPtr cfKey = new ImmutableBytesPtr(kv.getFamilyArray(), kv.getFamilyOffset(), - kv.getFamilyLength()); - gps = guidePostsInfoWriterMap.get(cfKey); - if (gps == null) { - gps = new Pair<Long, GuidePostsInfoBuilder>(0l, - new GuidePostsInfoBuilder()); - guidePostsInfoWriterMap.put(cfKey, gps); - } - if (famMap.get(cfKey) == null) { - famMap.put(cfKey, true); - gps.getSecond().incrementRowCount(); - } - } else { - gps = cachedGps; - cachedGps.getSecond().incrementRowCount(); - } - int kvLength = kv.getLength(); - long byteCount = gps.getFirst() + kvLength; - gps.setFirst(byteCount); - if (byteCount >= guidepostDepth) { - ImmutableBytesWritable row = new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()); - if (gps.getSecond().addGuidePosts(row, byteCount, gps.getSecond().getRowCount())) { - gps.setFirst(0l); - gps.getSecond().resetRowCount(); - } - } - } - } + long getMaxTimeStamp(); - public InternalScanner createCompactionScanner(RegionCoprocessorEnvironment env, Store store, InternalScanner s) throws IOException { - // See if this is for Major compaction - if (logger.isDebugEnabled()) { - logger.debug("Compaction scanner created for stats"); - } - ImmutableBytesPtr cfKey = new ImmutableBytesPtr(store.getFamily().getName()); - return getInternalScanner(env, store, s, cfKey); - } + /** + * Write the collected statistics for the given region. + */ + void updateStatistic(HRegion region); - protected InternalScanner getInternalScanner(RegionCoprocessorEnvironment env, Store store, - InternalScanner internalScan, ImmutableBytesPtr family) { - return new StatisticsScanner(this, statsTable, env, internalScan, family); - } + /** + * Collect statistics for the given list of cells. This method can be called multiple times + * during collection of statistics. + */ + void collectStatistics(List<Cell> results); - public void clear() { - this.guidePostsInfoWriterMap.clear(); - maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP; - } + /** + * Wrap a compaction scanner with a scanner that will collect statistics using this instance. + */ + InternalScanner createCompactionScanner(RegionCoprocessorEnvironment env, Store store, + InternalScanner delegate) throws IOException; - public GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam) { - Pair<Long, GuidePostsInfoBuilder> pair = guidePostsInfoWriterMap.get(fam); - if (pair != null) { return pair.getSecond().build(); } - return null; - } + /** + * Clear all statistics information that has been collected. + */ + void clear(); + /** + * Retrieve the calculated guide post info for the given column family. + */ + GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/50875184/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java new file mode 100644 index 0000000..a21796e --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stats; + +import java.io.IOException; + +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.phoenix.query.QueryServices; + +/** + * Provides new {@link DefaultStatisticsCollector} instances based on configuration settings for a + * table (or system-wide configuration of statistics). + */ +public class StatisticsCollectorFactory { + + public static StatisticsCollector createStatisticsCollector(RegionCoprocessorEnvironment env, + String tableName, long clientTimestamp, byte[] guidepostWidthBytes, + byte[] guidepostsPerRegionBytes) throws IOException { + if (statisticsEnabled(env)) { + return new DefaultStatisticsCollector(env, tableName, clientTimestamp, null, + guidepostWidthBytes, guidepostsPerRegionBytes); + } else { + return new NoOpStatisticsCollector(); + } + } + + public static StatisticsCollector createStatisticsCollector( + RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp, + byte[] storeName) throws IOException { + if (statisticsEnabled(env)) { + return new DefaultStatisticsCollector(env, tableName, clientTimeStamp, storeName, + null, null); + } else { + return new NoOpStatisticsCollector(); + } + } + + /** + * Determines if statistics are enabled (which is the default). This is done on the + * RegionCoprocessorEnvironment for now to allow setting this on a per-table basis, although + * it could be moved to the general table metadata in the future if there is a realistic + * use case for that. + */ + private static boolean statisticsEnabled(RegionCoprocessorEnvironment env) { + return env.getConfiguration().getBoolean(QueryServices.STATS_ENABLED_ATTRIB, true); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/50875184/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java index 36c2744..4e6a18f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java @@ -38,7 +38,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; /** - * The scanner that does the scanning to collect the stats during major compaction.{@link StatisticsCollector} + * The scanner that does the scanning to collect the stats during major compaction.{@link DefaultStatisticsCollector} */ public class StatisticsScanner implements InternalScanner { private static final Log LOG = LogFactory.getLog(StatisticsScanner.class);