PHOENIX-2249 SYSTEM.STATS not update after region merge occurs.(Ankit Singhal)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/42506f40 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/42506f40 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/42506f40 Branch: refs/heads/4.x-HBase-1.0 Commit: 42506f4052fb1c9a4b3822a946fee7a90b7fd35d Parents: d478eb1 Author: Rajeshbabu Chintaguntla <[email protected]> Authored: Tue Dec 15 17:10:58 2015 +0530 Committer: Rajeshbabu Chintaguntla <[email protected]> Committed: Tue Dec 15 17:10:58 2015 +0530 ---------------------------------------------------------------------- .../UngroupedAggregateRegionObserver.java | 15 ++++++++--- .../schema/stats/StatisticsCollector.java | 12 +++++---- .../phoenix/schema/stats/StatisticsScanner.java | 26 +++++++++++++++++--- 3 files changed, 41 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/42506f40/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 4325760..a19cac9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -41,7 +41,9 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; @@ -57,6 +59,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.coprocessor.generated.PTableProtos; import org.apache.phoenix.exception.DataExceedsCapacityException; @@ -105,11 +108,11 @@ import org.apache.phoenix.util.TimeKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import co.cask.tephra.TxConstants; - import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import co.cask.tephra.TxConstants; + /** * Region observer that aggregates ungrouped rows(i.e. SQL query with aggregation function and no GROUP BY). @@ -627,6 +630,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ boolean useCurrentTime = c.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME); + Connection conn = c.getEnvironment().getRegionServerServices().getConnection(); + Pair<HRegionInfo, HRegionInfo> mergeRegions = null; + if (store.hasReferences()) { + mergeRegions = MetaTableAccessor.getRegionsFromMergeQualifier(conn, + c.getEnvironment().getRegion().getRegionInfo().getRegionName()); + } // Provides a means of clients controlling their timestamps to not use current time // when background tasks are updating stats. Instead we track the max timestamp of // the cells and use that. @@ -634,7 +643,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ StatisticsCollector stats = new StatisticsCollector( c.getEnvironment(), table.getNameAsString(), clientTimeStamp, store.getFamily().getName()); - internalScanner = stats.createCompactionScanner(c.getEnvironment().getRegion(), store, scanner); + internalScanner = stats.createCompactionScanner(c.getEnvironment().getRegion(), store, scanner, mergeRegions); } catch (IOException e) { // If we can't reach the stats table, don't interrupt the normal // compaction operation, just log a warning. http://git-wip-us.apache.org/repos/asf/phoenix/blob/42506f40/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java index 272cac6..22fdf90 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Mutation; @@ -215,13 +216,14 @@ public class StatisticsCollector { } } - public InternalScanner createCompactionScanner(HRegion region, Store store, InternalScanner s) throws IOException { + public InternalScanner createCompactionScanner(HRegion region, Store store, InternalScanner s, + Pair<HRegionInfo, HRegionInfo> mergeRegions) throws IOException { // See if this is for Major compaction if (logger.isDebugEnabled()) { logger.debug("Compaction scanner created for stats"); } ImmutableBytesPtr cfKey = new ImmutableBytesPtr(store.getFamily().getName()); - return getInternalScanner(region, store, s, cfKey); + return getInternalScanner(region, store, s, cfKey, mergeRegions); } public void splitStats(HRegion parent, HRegion left, HRegion right) { @@ -243,9 +245,9 @@ public class StatisticsCollector { } } - protected InternalScanner getInternalScanner(HRegion region, Store store, - InternalScanner internalScan, ImmutableBytesPtr family) { - return new StatisticsScanner(this, statsTable, region, internalScan, family); + protected InternalScanner getInternalScanner(HRegion region, Store store, InternalScanner internalScan, + ImmutableBytesPtr family, Pair<HRegionInfo, HRegionInfo> mergeRegions) { + return new StatisticsScanner(this, statsTable, region, internalScan, family, mergeRegions); } public void clear() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/42506f40/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java index de59304..5c460e3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java @@ -24,10 +24,12 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; /** @@ -40,14 +42,16 @@ public class StatisticsScanner implements InternalScanner { private HRegion region; private StatisticsCollector tracker; private ImmutableBytesPtr family; + private Pair<HRegionInfo, HRegionInfo> mergeRegions; public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, HRegion region, - InternalScanner delegate, ImmutableBytesPtr family) { + InternalScanner delegate, ImmutableBytesPtr family, Pair<HRegionInfo, HRegionInfo> mergeRegions) { this.tracker = tracker; this.stats = stats; this.delegate = delegate; this.region = region; this.family = family; + this.mergeRegions = mergeRegions; } @Override @@ -83,6 +87,22 @@ public class StatisticsScanner implements InternalScanner { // update the statistics table // Just verify if this if fine ArrayList<Mutation> mutations = new ArrayList<Mutation>(); + if (mergeRegions != null) { + if (mergeRegions.getFirst() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting stale stats for the region " + + mergeRegions.getFirst().getRegionNameAsString() + " as part of major compaction"); + } + stats.deleteStats(mergeRegions.getFirst().getRegionName(), tracker, family, mutations); + } + if (mergeRegions.getSecond() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Deleting stale stats for the region " + + mergeRegions.getSecond().getRegionNameAsString() + " as part of major compaction"); + } + stats.deleteStats(mergeRegions.getSecond().getRegionName(), tracker, family, mutations); + } + } if (LOG.isDebugEnabled()) { LOG.debug("Deleting the stats for the region " + region.getRegionNameAsString() + " as part of major compaction"); @@ -115,9 +135,7 @@ public class StatisticsScanner implements InternalScanner { if (toThrow == null) toThrow = e; LOG.error("Error while closing the scanner", e); } finally { - if (toThrow != null) { - throw toThrow; - } + if (toThrow != null) { throw toThrow; } } } }
