PHOENIX-1427 Reduce work in StatsCollector
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/680e9fd6 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/680e9fd6 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/680e9fd6 Branch: refs/heads/4.2 Commit: 680e9fd6862a39e1bbb2b5b8162eabbfa580d5a8 Parents: cf0d07b Author: James Taylor <jtay...@salesforce.com> Authored: Wed Nov 12 13:55:29 2014 -0800 Committer: James Taylor <jtay...@salesforce.com> Committed: Wed Nov 12 17:09:58 2014 -0800 ---------------------------------------------------------------------- .../schema/stats/StatisticsCollector.java | 66 ++++++-------------- .../phoenix/schema/stats/StatisticsScanner.java | 10 +-- .../phoenix/schema/stats/StatisticsUtil.java | 11 ++-- .../phoenix/schema/stats/StatisticsWriter.java | 40 +++++------- 4 files changed, 44 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/680e9fd6/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java index 9c85e63..4123ebe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java @@ -59,11 +59,9 @@ public class StatisticsCollector { private static final Logger logger = LoggerFactory.getLogger(StatisticsCollector.class); public static final long NO_TIMESTAMP = -1; - private Map<String, byte[]> minMap = Maps.newHashMap(); - private Map<String, byte[]> maxMap = Maps.newHashMap(); private long guidepostDepth; private long maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP; - private Map<String, Pair<Long,GuidePostsInfo>> guidePostsMap = Maps.newHashMap(); + private Map<ImmutableBytesPtr, Pair<Long,GuidePostsInfo>> guidePostsMap = Maps.newHashMap(); // Tracks the bytecount per family if it has reached the guidePostsDepth private Map<ImmutableBytesPtr, Boolean> familyMap = Maps.newHashMap(); protected StatisticsWriter statsTable; @@ -112,13 +110,13 @@ public class StatisticsCollector { if(logger.isDebugEnabled()) { logger.debug("Deleting the stats for the region "+region.getRegionInfo()); } - statsTable.deleteStats(region.getRegionInfo().getRegionName(), this, Bytes.toString(fam.copyBytesIfNecessary()), + statsTable.deleteStats(region.getRegionInfo().getRegionName(), this, fam, mutations); } if(logger.isDebugEnabled()) { logger.debug("Adding new stats for the region "+region.getRegionInfo()); } - statsTable.addStats((region.getRegionInfo().getRegionName()), this, Bytes.toString(fam.copyBytesIfNecessary()), + statsTable.addStats((region.getRegionInfo().getRegionName()), this, fam, mutations); } } catch (IOException e) { @@ -149,7 +147,9 @@ public class StatisticsCollector { if (logger.isDebugEnabled()) { logger.debug("Compaction scanner created for stats"); } - return getInternalScanner(region, store, s, store.getColumnFamilyName()); + // FIXME: no way to get cf as byte[] ? + ImmutableBytesPtr cfKey = new ImmutableBytesPtr(Bytes.toBytes(store.getColumnFamilyName())); + return getInternalScanner(region, store, s, cfKey); } public void splitStats(HRegion parent, HRegion left, HRegion right) { @@ -159,7 +159,7 @@ public class StatisticsCollector { } List<Mutation> mutations = Lists.newArrayListWithExpectedSize(3); for (byte[] fam : parent.getStores().keySet()) { - statsTable.splitStats(parent, left, right, this, Bytes.toString(fam), mutations); + statsTable.splitStats(parent, left, right, this, new ImmutableBytesPtr(fam), mutations); } if (logger.isDebugEnabled()) { logger.debug("Committing stats for the daughter regions as part of split " + parent.getRegionInfo()); @@ -172,22 +172,19 @@ public class StatisticsCollector { } protected InternalScanner getInternalScanner(HRegion region, Store store, - InternalScanner internalScan, String family) { - return new StatisticsScanner(this, statsTable, region, internalScan, - Bytes.toBytes(family)); + InternalScanner internalScan, ImmutableBytesPtr family) { + return new StatisticsScanner(this, statsTable, region, internalScan, family); } public void clear() { - this.maxMap.clear(); - this.minMap.clear(); this.guidePostsMap.clear(); this.familyMap.clear(); maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP; } - public void addGuidePost(String fam, GuidePostsInfo info, long byteSize, long timestamp) { + public void addGuidePost(ImmutableBytesPtr cfKey, GuidePostsInfo info, long byteSize, long timestamp) { Pair<Long,GuidePostsInfo> newInfo = new Pair<Long,GuidePostsInfo>(byteSize,info); - Pair<Long,GuidePostsInfo> oldInfo = guidePostsMap.put(fam, newInfo); + Pair<Long,GuidePostsInfo> oldInfo = guidePostsMap.put(cfKey, newInfo); if (oldInfo != null) { info.combine(oldInfo.getSecond()); newInfo.setFirst(oldInfo.getFirst() + newInfo.getFirst()); @@ -195,56 +192,31 @@ public class StatisticsCollector { maxTimeStamp = Math.max(maxTimeStamp, timestamp); } + @SuppressWarnings("deprecation") public void updateStatistic(KeyValue kv) { - @SuppressWarnings("deprecation") - byte[] cf = kv.getFamily(); - familyMap.put(new ImmutableBytesPtr(cf), true); + ImmutableBytesPtr cfKey = new ImmutableBytesPtr(kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength()); + familyMap.put(cfKey, true); - String fam = Bytes.toString(cf); - byte[] row = ByteUtil.copyKeyBytesIfNecessary( - new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); - if (!minMap.containsKey(fam) && !maxMap.containsKey(fam)) { - minMap.put(fam, row); - // Ideally the max key also should be added in this case - maxMap.put(fam, row); - } else { - if (Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), minMap.get(fam), 0, - minMap.get(fam).length) < 0) { - minMap.put(fam, row); - } - if (Bytes.compareTo(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), maxMap.get(fam), 0, - maxMap.get(fam).length) > 0) { - maxMap.put(fam, row); - } - } maxTimeStamp = Math.max(maxTimeStamp, kv.getTimestamp()); // TODO : This can be moved to an interface so that we could collect guide posts in different ways - Pair<Long,GuidePostsInfo> gps = guidePostsMap.get(fam); + Pair<Long,GuidePostsInfo> gps = guidePostsMap.get(cfKey); if (gps == null) { gps = new Pair<Long,GuidePostsInfo>(0L,new GuidePostsInfo(0, Collections.<byte[]>emptyList())); - guidePostsMap.put(fam, gps); + guidePostsMap.put(cfKey, gps); } int kvLength = kv.getLength(); long byteCount = gps.getFirst() + kvLength; gps.setFirst(byteCount); if (byteCount >= guidepostDepth) { + byte[] row = ByteUtil.copyKeyBytesIfNecessary( + new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); if (gps.getSecond().addGuidePost(row, byteCount)) { gps.setFirst(0L); } } } - public byte[] getMaxKey(String fam) { - if (maxMap.get(fam) != null) { return maxMap.get(fam); } - return null; - } - - public byte[] getMinKey(String fam) { - if (minMap.get(fam) != null) { return minMap.get(fam); } - return null; - } - - public GuidePostsInfo getGuidePosts(String fam) { + public GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam) { Pair<Long,GuidePostsInfo> pair = guidePostsMap.get(fam); if (pair != null) { return pair.getSecond(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/680e9fd6/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java index fa3930d..51b6a6b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java @@ -20,7 +20,7 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; /** * The scanner that does the scanning to collect the stats during major compaction.{@link StatisticsCollector} @@ -31,10 +31,10 @@ public class StatisticsScanner implements InternalScanner { private StatisticsWriter stats; private HRegion region; private StatisticsCollector tracker; - private byte[] family; + private ImmutableBytesPtr family; public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, HRegion region, - InternalScanner delegate, byte[] family) { + InternalScanner delegate, ImmutableBytesPtr family) { this.tracker = tracker; this.stats = stats; this.delegate = delegate; @@ -83,12 +83,12 @@ public class StatisticsScanner implements InternalScanner { LOG.debug("Deleting the stats for the region " + region.getRegionNameAsString() + " as part of major compaction"); } - stats.deleteStats(region.getRegionName(), this.tracker, Bytes.toString(family), mutations); + stats.deleteStats(region.getRegionName(), this.tracker, family, mutations); if (LOG.isDebugEnabled()) { LOG.debug("Adding new stats for the region " + region.getRegionNameAsString() + " as part of major compaction"); } - stats.addStats(region.getRegionName(), this.tracker, Bytes.toString(family), mutations); + stats.addStats(region.getRegionName(), this.tracker, family, mutations); if (LOG.isDebugEnabled()) { LOG.debug("Committing new stats for the region " + region.getRegionNameAsString() + " as part of major compaction"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/680e9fd6/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java index 2a7047f..bf9d80e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.util.ByteUtil; @@ -50,15 +51,15 @@ public class StatisticsUtil { /** Number of parts in our complex key */ protected static final int NUM_KEY_PARTS = 3; - public static byte[] getRowKey(byte[] table, byte[] fam, byte[] region) { + public static byte[] getRowKey(byte[] table, ImmutableBytesPtr fam, byte[] region) { // always starts with the source table - byte[] rowKey = new byte[table.length + fam.length + region.length + 2]; + byte[] rowKey = new byte[table.length + fam.getLength() + region.length + 2]; int offset = 0; System.arraycopy(table, 0, rowKey, offset, table.length); offset += table.length; rowKey[offset++] = QueryConstants.SEPARATOR_BYTE; - System.arraycopy(fam, 0, rowKey, offset, fam.length); - offset += fam.length; + System.arraycopy(fam.get(), fam.getOffset(), rowKey, offset, fam.getLength()); + offset += fam.getLength(); rowKey[offset++] = QueryConstants.SEPARATOR_BYTE; System.arraycopy(region, 0, rowKey, offset, region.length); return rowKey; @@ -68,7 +69,7 @@ public class StatisticsUtil { return Arrays.copyOfRange(kv.getRowArray(), kv.getRowOffset(), kv.getRowOffset() + kv.getRowLength()); } - public static Result readRegionStatistics(HTableInterface statsHTable, byte[] tableNameBytes, byte[] cf, byte[] regionName, long clientTimeStamp) + public static Result readRegionStatistics(HTableInterface statsHTable, byte[] tableNameBytes, ImmutableBytesPtr cf, byte[] regionName, long clientTimeStamp) throws IOException { byte[] prefix = StatisticsUtil.getRowKey(tableNameBytes, cf, regionName); Get get = new Get(prefix); http://git-wip-us.apache.org/repos/asf/phoenix/blob/680e9fd6/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java index f70c327..9b6efc9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRo import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PDataType; @@ -102,15 +103,14 @@ public class StatisticsWriter implements Closeable { statsWriterTable.close(); } - public void splitStats(HRegion p, HRegion l, HRegion r, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException { + public void splitStats(HRegion p, HRegion l, HRegion r, StatisticsCollector tracker, ImmutableBytesPtr cfKey, List<Mutation> mutations) throws IOException { if (tracker == null) { return; } boolean useMaxTimeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP; if (!useMaxTimeStamp) { mutations.add(getLastStatsUpdatedTimePut(clientTimeStamp)); } long readTimeStamp = useMaxTimeStamp ? HConstants.LATEST_TIMESTAMP : clientTimeStamp; - byte[] famBytes = PDataType.VARCHAR.toBytes(fam); - Result result = StatisticsUtil.readRegionStatistics(statsReaderTable, tableName, famBytes, p.getRegionName(), readTimeStamp); + Result result = StatisticsUtil.readRegionStatistics(statsReaderTable, tableName, cfKey, p.getRegionName(), readTimeStamp); if (result != null && !result.isEmpty()) { Cell cell = result.getColumnLatestCell(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES); @@ -118,7 +118,7 @@ public class StatisticsWriter implements Closeable { long writeTimeStamp = useMaxTimeStamp ? cell.getTimestamp() : clientTimeStamp; GuidePostsInfo guidePosts = GuidePostsInfo.fromBytes(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - byte[] pPrefix = StatisticsUtil.getRowKey(tableName, famBytes, p.getRegionName()); + byte[] pPrefix = StatisticsUtil.getRowKey(tableName, cfKey, p.getRegionName()); mutations.add(new Delete(pPrefix, writeTimeStamp)); long byteSize = 0; @@ -139,14 +139,14 @@ public class StatisticsWriter implements Closeable { if (midEndIndex > 0) { GuidePostsInfo lguidePosts = new GuidePostsInfo(byteSize, guidePosts.getGuidePosts().subList(0, midEndIndex)); tracker.clear(); - tracker.addGuidePost(fam, lguidePosts, byteSize, cell.getTimestamp()); - addStats(l.getRegionName(), tracker, fam, mutations); + tracker.addGuidePost(cfKey, lguidePosts, byteSize, cell.getTimestamp()); + addStats(l.getRegionName(), tracker, cfKey, mutations); } if (midStartIndex < guidePosts.getGuidePosts().size()) { GuidePostsInfo rguidePosts = new GuidePostsInfo(byteSize, guidePosts.getGuidePosts().subList(midStartIndex, guidePosts.getGuidePosts().size())); tracker.clear(); - tracker.addGuidePost(fam, rguidePosts, byteSize, cell.getTimestamp()); - addStats(r.getRegionName(), tracker, fam, mutations); + tracker.addGuidePost(cfKey, rguidePosts, byteSize, cell.getTimestamp()); + addStats(r.getRegionName(), tracker, cfKey, mutations); } } } @@ -157,7 +157,7 @@ public class StatisticsWriter implements Closeable { * then we use Upsert queries to update the table * If the region gets splitted or the major compaction happens we update using HTable.put() * @param tracker - the statistics tracker - * @param fam - the family for which the stats is getting collected. + * @param cfKey - the family for which the stats is getting collected. * @param mutations - list of mutations that collects all the mutations to commit in a batch * @param tablekey - The table name * @param schemaName - the schema name associated with the table @@ -167,7 +167,7 @@ public class StatisticsWriter implements Closeable { * if we fail to do any of the puts. Any single failure will prevent any future attempts for the remaining list of stats to * update */ - public void addStats(byte[] regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException { + public void addStats(byte[] regionName, StatisticsCollector tracker, ImmutableBytesPtr cfKey, List<Mutation> mutations) throws IOException { if (tracker == null) { return; } boolean useMaxTimeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP; long timeStamp = clientTimeStamp; @@ -175,10 +175,9 @@ public class StatisticsWriter implements Closeable { timeStamp = tracker.getMaxTimeStamp(); mutations.add(getLastStatsUpdatedTimePut(timeStamp)); } - byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam), - regionName); + byte[] prefix = StatisticsUtil.getRowKey(tableName, cfKey, regionName); Put put = new Put(prefix); - GuidePostsInfo gp = tracker.getGuidePosts(fam); + GuidePostsInfo gp = tracker.getGuidePosts(cfKey); if (gp != null) { put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_COUNT_BYTES, timeStamp, PDataType.LONG.toBytes((gp.getGuidePosts().size()))); @@ -187,16 +186,6 @@ public class StatisticsWriter implements Closeable { put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES, timeStamp, PDataType.LONG.toBytes(gp.getByteCount())); } - byte[] minKey = tracker.getMinKey(fam); - if (minKey != null) { - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_KEY_BYTES, - timeStamp, PDataType.VARBINARY.toBytes(minKey)); - } - byte[] maxKey = tracker.getMaxKey(fam); - if (maxKey != null) { - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_KEY_BYTES, - timeStamp, PDataType.VARBINARY.toBytes(maxKey)); - } // Add our empty column value so queries behave correctly put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, ByteUtil.EMPTY_BYTE_ARRAY); @@ -248,11 +237,10 @@ public class StatisticsWriter implements Closeable { statsWriterTable.put(put); } - public void deleteStats(byte[] regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations) + public void deleteStats(byte[] regionName, StatisticsCollector tracker, ImmutableBytesPtr fam, List<Mutation> mutations) throws IOException { long timeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP ? tracker.getMaxTimeStamp() : clientTimeStamp; - byte[] prefix = StatisticsUtil.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam), - regionName); + byte[] prefix = StatisticsUtil.getRowKey(tableName, fam, regionName); mutations.add(new Delete(prefix, timeStamp - 1)); } } \ No newline at end of file