PHOENIX-2249 SYSTEM.STATS not update after region merge occurs.(Ankit Singhal)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/42506f40
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/42506f40
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/42506f40

Branch: refs/heads/4.x-HBase-1.0
Commit: 42506f4052fb1c9a4b3822a946fee7a90b7fd35d
Parents: d478eb1
Author: Rajeshbabu Chintaguntla <[email protected]>
Authored: Tue Dec 15 17:10:58 2015 +0530
Committer: Rajeshbabu Chintaguntla <[email protected]>
Committed: Tue Dec 15 17:10:58 2015 +0530

----------------------------------------------------------------------
 .../UngroupedAggregateRegionObserver.java       | 15 ++++++++---
 .../schema/stats/StatisticsCollector.java       | 12 +++++----
 .../phoenix/schema/stats/StatisticsScanner.java | 26 +++++++++++++++++---
 3 files changed, 41 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/42506f40/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 4325760..a19cac9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -41,7 +41,9 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -57,6 +59,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
@@ -105,11 +108,11 @@ import org.apache.phoenix.util.TimeKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import co.cask.tephra.TxConstants;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import co.cask.tephra.TxConstants;
+
 
 /**
  * Region observer that aggregates ungrouped rows(i.e. SQL query with 
aggregation function and no GROUP BY).
@@ -627,6 +630,12 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                 boolean useCurrentTime = 
                         
c.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
 
                                 
QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
+                Connection conn = 
c.getEnvironment().getRegionServerServices().getConnection();
+                Pair<HRegionInfo, HRegionInfo> mergeRegions = null;
+                if (store.hasReferences()) {
+                    mergeRegions = 
MetaTableAccessor.getRegionsFromMergeQualifier(conn,
+                            
c.getEnvironment().getRegion().getRegionInfo().getRegionName());
+                }
                 // Provides a means of clients controlling their timestamps to 
not use current time
                 // when background tasks are updating stats. Instead we track 
the max timestamp of
                 // the cells and use that.
@@ -634,7 +643,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                 StatisticsCollector stats = new StatisticsCollector(
                         c.getEnvironment(), table.getNameAsString(),
                         clientTimeStamp, store.getFamily().getName());
-                internalScanner = 
stats.createCompactionScanner(c.getEnvironment().getRegion(), store, scanner);
+                internalScanner = 
stats.createCompactionScanner(c.getEnvironment().getRegion(), store, scanner, 
mergeRegions);
             } catch (IOException e) {
                 // If we can't reach the stats table, don't interrupt the 
normal
                 // compaction operation, just log a warning.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42506f40/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
index 272cac6..22fdf90 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -215,13 +216,14 @@ public class StatisticsCollector {
         }
     }
 
-    public InternalScanner createCompactionScanner(HRegion region, Store 
store, InternalScanner s) throws IOException {
+    public InternalScanner createCompactionScanner(HRegion region, Store 
store, InternalScanner s,
+            Pair<HRegionInfo, HRegionInfo> mergeRegions) throws IOException {
         // See if this is for Major compaction
         if (logger.isDebugEnabled()) {
             logger.debug("Compaction scanner created for stats");
         }
         ImmutableBytesPtr cfKey = new 
ImmutableBytesPtr(store.getFamily().getName());
-        return getInternalScanner(region, store, s, cfKey);
+        return getInternalScanner(region, store, s, cfKey, mergeRegions);
     }
 
     public void splitStats(HRegion parent, HRegion left, HRegion right) {
@@ -243,9 +245,9 @@ public class StatisticsCollector {
         }
     }
 
-    protected InternalScanner getInternalScanner(HRegion region, Store store,
-            InternalScanner internalScan, ImmutableBytesPtr family) {
-        return new StatisticsScanner(this, statsTable, region, internalScan, 
family);
+    protected InternalScanner getInternalScanner(HRegion region, Store store, 
InternalScanner internalScan,
+            ImmutableBytesPtr family, Pair<HRegionInfo, HRegionInfo> 
mergeRegions) {
+        return new StatisticsScanner(this, statsTable, region, internalScan, 
family, mergeRegions);
     }
 
     public void clear() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/42506f40/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index de59304..5c460e3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -24,10 +24,12 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
 /**
@@ -40,14 +42,16 @@ public class StatisticsScanner implements InternalScanner {
     private HRegion region;
     private StatisticsCollector tracker;
     private ImmutableBytesPtr family;
+    private Pair<HRegionInfo, HRegionInfo> mergeRegions;
 
     public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter 
stats, HRegion region,
-            InternalScanner delegate, ImmutableBytesPtr family) {
+            InternalScanner delegate, ImmutableBytesPtr family, 
Pair<HRegionInfo, HRegionInfo> mergeRegions) {
         this.tracker = tracker;
         this.stats = stats;
         this.delegate = delegate;
         this.region = region;
         this.family = family;
+        this.mergeRegions = mergeRegions;
     }
 
     @Override
@@ -83,6 +87,22 @@ public class StatisticsScanner implements InternalScanner {
             // update the statistics table
             // Just verify if this if fine
             ArrayList<Mutation> mutations = new ArrayList<Mutation>();
+            if (mergeRegions != null) {
+                if (mergeRegions.getFirst() != null) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Deleting stale stats for the region "
+                                + 
mergeRegions.getFirst().getRegionNameAsString() + " as part of major 
compaction");
+                    }
+                    stats.deleteStats(mergeRegions.getFirst().getRegionName(), 
tracker, family, mutations);
+                }
+                if (mergeRegions.getSecond() != null) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Deleting stale stats for the region "
+                                + 
mergeRegions.getSecond().getRegionNameAsString() + " as part of major 
compaction");
+                    }
+                    
stats.deleteStats(mergeRegions.getSecond().getRegionName(), tracker, family, 
mutations);
+                }
+            }
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Deleting the stats for the region " + 
region.getRegionNameAsString()
                         + " as part of major compaction");
@@ -115,9 +135,7 @@ public class StatisticsScanner implements InternalScanner {
                     if (toThrow == null) toThrow = e;
                     LOG.error("Error while closing the scanner", e);
                 } finally {
-                    if (toThrow != null) {
-                        throw toThrow;
-                    }
+                    if (toThrow != null) { throw toThrow; }
                 }
             }
         }

Reply via email to