Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 47a82a62d -> 5626308be
PHOENIX-3028 StatisticsWriter should update the stats collection timestamp asynchronously Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5626308b Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5626308b Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5626308b Branch: refs/heads/4.x-HBase-0.98 Commit: 5626308bea040f79332b8cfb55c57301451ac61f Parents: 47a82a6 Author: Samarth <[email protected]> Authored: Tue Jun 28 09:36:20 2016 -0700 Committer: Samarth <[email protected]> Committed: Tue Jun 28 09:37:21 2016 -0700 ---------------------------------------------------------------------- .../stats/DefaultStatisticsCollector.java | 4 ++-- .../stats/StatisticsCollectionRunTracker.java | 1 - .../phoenix/schema/stats/StatisticsScanner.java | 21 ++++++++++---------- .../phoenix/schema/stats/StatisticsWriter.java | 15 ++++++-------- 4 files changed, 18 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/5626308b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java index 9d47358..0247249 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java @@ -154,7 +154,7 @@ public class DefaultStatisticsCollector implements StatisticsCollector { } private void commitStats(List<Mutation> mutations) throws IOException { - statsWriter.commitStats(mutations); + statsWriter.commitStats(mutations, this); } /** @@ -215,7 +215,7 @@ public class DefaultStatisticsCollector implements StatisticsCollector { return getInternalScanner(env, store, s, cfKey); } - protected InternalScanner getInternalScanner(RegionCoprocessorEnvironment env, Store store, + private InternalScanner getInternalScanner(RegionCoprocessorEnvironment env, Store store, InternalScanner internalScan, ImmutableBytesPtr family) { return new StatisticsScanner(this, statsWriter, env, internalScan, family); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5626308b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java index 4ed3325..560fc0a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectionRunTracker.java @@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionInfo; http://git-wip-us.apache.org/repos/asf/phoenix/blob/5626308b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java index 4e6a18f..a9ce275 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java @@ -43,35 +43,34 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; public class StatisticsScanner implements InternalScanner { private static final Log LOG = LogFactory.getLog(StatisticsScanner.class); private InternalScanner delegate; - private StatisticsWriter stats; + private StatisticsWriter statsWriter; private HRegion region; private StatisticsCollector tracker; private ImmutableBytesPtr family; private final Configuration config; - public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, RegionCoprocessorEnvironment env, + public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter statsWriter, RegionCoprocessorEnvironment env, InternalScanner delegate, ImmutableBytesPtr family) { this.tracker = tracker; - this.stats = stats; + this.statsWriter = statsWriter; this.delegate = delegate; this.region = env.getRegion(); this.config = env.getConfiguration(); this.family = family; - StatisticsCollectionRunTracker.getInstance(config).addCompactingRegion(region.getRegionInfo()); } @Override public boolean next(List<Cell> result) throws IOException { boolean ret = delegate.next(result); - updateStat(result); + updateStats(result); return ret; } @Override public boolean next(List<Cell> result, int limit) throws IOException { boolean ret = delegate.next(result, limit); - updateStat(result); + updateStats(result); return ret; } @@ -81,7 +80,7 @@ public class StatisticsScanner implements InternalScanner { * @param results * next batch of {@link KeyValue}s */ - protected void updateStat(final List<Cell> results) { + private void updateStats(final List<Cell> results) { if (!results.isEmpty()) { tracker.collectStatistics(results); } @@ -101,24 +100,24 @@ public class StatisticsScanner implements InternalScanner { LOG.debug("Deleting the stats for the region " + region.getRegionNameAsString() + " as part of major compaction"); } - stats.deleteStats(region, tracker, family, mutations); + statsWriter.deleteStats(region, tracker, family, mutations); if (LOG.isDebugEnabled()) { LOG.debug("Adding new stats for the region " + region.getRegionNameAsString() + " as part of major compaction"); } - stats.addStats(tracker, family, mutations); + statsWriter.addStats(tracker, family, mutations); if (LOG.isDebugEnabled()) { LOG.debug("Committing new stats for the region " + region.getRegionNameAsString() + " as part of major compaction"); } - stats.commitStats(mutations); + statsWriter.commitStats(mutations, tracker); } catch (IOException e) { LOG.error("Failed to update statistics table!", e); toThrow = e; } finally { try { statsRunState.removeCompactingRegion(region.getRegionInfo()); - stats.close(); + statsWriter.close(); tracker.close();// close the tracker } catch (IOException e) { if (toThrow == null) toThrow = e; http://git-wip-us.apache.org/repos/asf/phoenix/blob/5626308b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java index 3bd3cef..1408493 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java @@ -78,10 +78,6 @@ public class StatisticsWriter implements Closeable { HTableInterface statsReaderTable = ServerUtil.getHTableForCoprocessorScan(env, statsWriterTable); StatisticsWriter statsTable = new StatisticsWriter(statsReaderTable, statsWriterTable, tableName, clientTimeStamp); - if (clientTimeStamp != StatisticsCollector.NO_TIMESTAMP) { // Otherwise we do this later as we don't know the ts - // yet - statsTable.commitLastStatsUpdatedTime(); - } return statsTable; } @@ -193,7 +189,8 @@ public class StatisticsWriter implements Closeable { } } - public void commitStats(List<Mutation> mutations) throws IOException { + public void commitStats(List<Mutation> mutations, StatisticsCollector statsCollector) throws IOException { + commitLastStatsUpdatedTime(statsCollector); if (mutations.size() > 0) { byte[] row = mutations.get(0).getRow(); MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder(); @@ -220,10 +217,10 @@ public class StatisticsWriter implements Closeable { return put; } - private void commitLastStatsUpdatedTime() throws IOException { - // Always use wallclock time for this, as it's a mechanism to prevent - // stats from being collected too often. - Put put = getLastStatsUpdatedTimePut(clientTimeStamp); + private void commitLastStatsUpdatedTime(StatisticsCollector statsCollector) throws IOException { + long timeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP ? statsCollector.getMaxTimeStamp() + : clientTimeStamp; + Put put = getLastStatsUpdatedTimePut(timeStamp); statsWriterTable.put(put); }
