Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.0 0198eef2a -> e9a160d09


PHOENIX-3028 StatisticsWriter should update the stats collection timestamp 
asynchronously


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e9a160d0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e9a160d0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e9a160d0

Branch: refs/heads/4.x-HBase-1.0
Commit: e9a160d09fef757a2e92b2a52eef61cae231628d
Parents: 0198eef
Author: Samarth <[email protected]>
Authored: Tue Jun 28 10:05:37 2016 -0700
Committer: Samarth <[email protected]>
Committed: Tue Jun 28 10:05:37 2016 -0700

----------------------------------------------------------------------
 .../stats/DefaultStatisticsCollector.java       |  4 ++--
 .../stats/StatisticsCollectionRunTracker.java   |  1 -
 .../phoenix/schema/stats/StatisticsScanner.java | 21 ++++++++++----------
 .../phoenix/schema/stats/StatisticsWriter.java  | 14 +++++--------
 4 files changed, 17 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e9a160d0/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
index 09e9be5..6809d95 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
@@ -154,7 +154,7 @@ class DefaultStatisticsCollector implements 
StatisticsCollector {
     }
 
     private void commitStats(List<Mutation> mutations) throws IOException {
-        statsWriter.commitStats(mutations);
+        statsWriter.commitStats(mutations, this);
     }
 
     /**
@@ -215,7 +215,7 @@ class DefaultStatisticsCollector implements 
StatisticsCollector {
         return getInternalScanner(env, s, cfKey);
     }
 
-    protected InternalScanner getInternalScanner(RegionCoprocessorEnvironment 
env, InternalScanner internalScan,
+    private InternalScanner getInternalScanner(RegionCoprocessorEnvironment 
env, InternalScanner internalScan,
             ImmutableBytesPtr family) {
         return new StatisticsScanner(this, statsWriter, env, internalScan, 
family);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e9a160d0/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java
index 4ed3325..560fc0a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionInfo;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e9a160d0/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index 4e6a18f..a9ce275 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -43,35 +43,34 @@ import 
org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 public class StatisticsScanner implements InternalScanner {
     private static final Log LOG = LogFactory.getLog(StatisticsScanner.class);
     private InternalScanner delegate;
-    private StatisticsWriter stats;
+    private StatisticsWriter statsWriter;
     private HRegion region;
     private StatisticsCollector tracker;
     private ImmutableBytesPtr family;
     private final Configuration config;
 
-    public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter 
stats, RegionCoprocessorEnvironment env,
+    public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter 
statsWriter, RegionCoprocessorEnvironment env,
             InternalScanner delegate, ImmutableBytesPtr family) {
         this.tracker = tracker;
-        this.stats = stats;
+        this.statsWriter = statsWriter;
         this.delegate = delegate;
         this.region = env.getRegion();
         this.config = env.getConfiguration();
         this.family = family;
-
         
StatisticsCollectionRunTracker.getInstance(config).addCompactingRegion(region.getRegionInfo());
     }
 
     @Override
     public boolean next(List<Cell> result) throws IOException {
         boolean ret = delegate.next(result);
-        updateStat(result);
+        updateStats(result);
         return ret;
     }
 
     @Override
     public boolean next(List<Cell> result, int limit) throws IOException {
         boolean ret = delegate.next(result, limit);
-        updateStat(result);
+        updateStats(result);
         return ret;
     }
 
@@ -81,7 +80,7 @@ public class StatisticsScanner implements InternalScanner {
      * @param results
      *            next batch of {@link KeyValue}s
      */
-    protected void updateStat(final List<Cell> results) {
+    private void updateStats(final List<Cell> results) {
         if (!results.isEmpty()) {
             tracker.collectStatistics(results);
         }
@@ -101,24 +100,24 @@ public class StatisticsScanner implements InternalScanner 
{
                     LOG.debug("Deleting the stats for the region " + 
region.getRegionNameAsString()
                         + " as part of major compaction");
                 }
-                stats.deleteStats(region, tracker, family, mutations);
+                statsWriter.deleteStats(region, tracker, family, mutations);
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Adding new stats for the region " + 
region.getRegionNameAsString()
                         + " as part of major compaction");
                 }
-                stats.addStats(tracker, family, mutations);
+                statsWriter.addStats(tracker, family, mutations);
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Committing new stats for the region " + 
region.getRegionNameAsString()
                         + " as part of major compaction");
                 }
-                stats.commitStats(mutations);
+                statsWriter.commitStats(mutations, tracker);
             } catch (IOException e) {
                 LOG.error("Failed to update statistics table!", e);
                 toThrow = e;
             } finally {
                 try {
                     
statsRunState.removeCompactingRegion(region.getRegionInfo());
-                    stats.close();
+                    statsWriter.close();
                     tracker.close();// close the tracker
                 } catch (IOException e) {
                     if (toThrow == null) toThrow = e;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e9a160d0/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
index 86c8ce9..c7edf6f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
@@ -78,10 +78,6 @@ public class StatisticsWriter implements Closeable {
         HTableInterface statsReaderTable = 
ServerUtil.getHTableForCoprocessorScan(env, statsWriterTable);
         StatisticsWriter statsTable = new StatisticsWriter(statsReaderTable, 
statsWriterTable, tableName,
                 clientTimeStamp);
-        if (clientTimeStamp != DefaultStatisticsCollector.NO_TIMESTAMP) { // 
Otherwise we do this later as we don't know the ts
-                                                                   // yet
-            statsTable.commitLastStatsUpdatedTime();
-        }
         return statsTable;
     }
 
@@ -193,7 +189,8 @@ public class StatisticsWriter implements Closeable {
         }
     }
 
-    public void commitStats(List<Mutation> mutations) throws IOException {
+    public void commitStats(List<Mutation> mutations, StatisticsCollector 
statsCollector) throws IOException {
+        commitLastStatsUpdatedTime(statsCollector);
         if (mutations.size() > 0) {
             byte[] row = mutations.get(0).getRow();
             MutateRowsRequest.Builder mrmBuilder = 
MutateRowsRequest.newBuilder();
@@ -220,10 +217,9 @@ public class StatisticsWriter implements Closeable {
         return put;
     }
 
-    private void commitLastStatsUpdatedTime() throws IOException {
-        // Always use wallclock time for this, as it's a mechanism to prevent
-        // stats from being collected too often.
-        Put put = getLastStatsUpdatedTimePut(clientTimeStamp);
+    private void commitLastStatsUpdatedTime(StatisticsCollector 
statsCollector) throws IOException {
+        long timeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP ? 
statsCollector.getMaxTimeStamp() : clientTimeStamp;
+        Put put = getLastStatsUpdatedTimePut(timeStamp);
         statsWriterTable.put(put);
     }
 

Reply via email to