Repository: phoenix
Updated Branches:
  refs/heads/master 28a8b802c -> c2cc1be60


LP-2692 Config setting for disabling stats

Add configuration setting to allow disabling stats collection, for
environments where it is not desired or is causing issues.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c2cc1be6
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c2cc1be6
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c2cc1be6

Branch: refs/heads/master
Commit: c2cc1be60492844779ab713d5cd84d37a17e6651
Parents: 28a8b80
Author: Gabriel Reid <gabri...@ngdata.com>
Authored: Thu Feb 18 10:20:36 2016 +0100
Committer: Gabriel Reid <gabri...@ngdata.com>
Committed: Fri Feb 19 15:22:53 2016 +0100

----------------------------------------------------------------------
 .../end2end/StatsCollectionDisabledIT.java      |  70 ++++++
 .../UngroupedAggregateRegionObserver.java       |  12 +-
 .../org/apache/phoenix/query/QueryServices.java |   1 +
 .../stats/DefaultStatisticsCollector.java       | 223 +++++++++++++++++++
 .../schema/stats/NoOpStatisticsCollector.java   |  72 ++++++
 .../phoenix/schema/stats/PTableStats.java       |   2 +-
 .../schema/stats/StatisticsCollector.java       | 213 +++---------------
 .../stats/StatisticsCollectorFactory.java       |  63 ++++++
 .../phoenix/schema/stats/StatisticsScanner.java |   2 +-
 .../phoenix/schema/stats/StatisticsWriter.java  |   6 +-
 10 files changed, 471 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2cc1be6/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectionDisabledIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectionDisabledIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectionDisabledIT.java
new file mode 100644
index 0000000..a92a665
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectionDisabledIT.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.collect.Maps;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Verifies that statistics are not collected if they are disabled via a 
setting
+ */
+public class StatsCollectionDisabledIT extends StatsCollectorAbstractIT {
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Must update config before starting server
+        props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, 
Long.toString(20));
+        props.put(QueryServices.STATS_ENABLED_ATTRIB, Boolean.toString(false));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Test
+    public void testStatisticsAreNotWritten() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE T1 (ID INTEGER NOT NULL PRIMARY KEY, NAME 
VARCHAR)");
+        stmt.execute("UPSERT INTO T1 VALUES (1, 'NAME1')");
+        stmt.execute("UPSERT INTO T1 VALUES (2, 'NAME2')");
+        stmt.execute("UPSERT INTO T1 VALUES (3, 'NAME3')");
+        conn.commit();
+        stmt.execute("UPDATE STATISTICS T1");
+        ResultSet rs = stmt.executeQuery("SELECT * FROM SYSTEM.STATS");
+        assertFalse(rs.next());
+        rs.close();
+        stmt.close();
+        conn.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2cc1be6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index d850eab..6550653 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -89,6 +89,7 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
 import org.apache.phoenix.schema.stats.StatisticsCollector;
+import org.apache.phoenix.schema.stats.StatisticsCollectorFactory;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.types.PBinary;
 import org.apache.phoenix.schema.types.PChar;
@@ -184,9 +185,9 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             byte[] gp_per_region_bytes =
                     
scan.getAttribute(BaseScannerRegionObserver.GUIDEPOST_PER_REGION);
             // Let this throw, as this scan is being done for the sole purpose 
of collecting stats
-            StatisticsCollector statsCollector =
-                    new StatisticsCollector(env, 
region.getRegionInfo().getTable()
-                            .getNameAsString(), ts, gp_width_bytes, 
gp_per_region_bytes);
+            StatisticsCollector statsCollector = 
StatisticsCollectorFactory.createStatisticsCollector(
+                    env, region.getRegionInfo().getTable().getNameAsString(), 
ts,
+                    gp_width_bytes, gp_per_region_bytes);
             return collectStats(s, statsCollector, region, scan, 
env.getConfiguration());
         }
         int offsetToBe = 0;
@@ -609,8 +610,9 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             try {
                 Pair<HRegionInfo, HRegionInfo> mergeRegions = null;
                 long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
-                StatisticsCollector stats = new 
StatisticsCollector(c.getEnvironment(), table.getNameAsString(),
-                        clientTimeStamp, store.getFamily().getName());
+                StatisticsCollector stats = 
StatisticsCollectorFactory.createStatisticsCollector(
+                        c.getEnvironment(), table.getNameAsString(), 
clientTimeStamp,
+                        store.getFamily().getName());
                 internalScanner = 
stats.createCompactionScanner(c.getEnvironment(), store, scanner);
             } catch (IOException e) {
                 // If we can't reach the stats table, don't interrupt the 
normal

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2cc1be6/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index fe40d60..1efcd8c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -154,6 +154,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB = 
"phoenix.stats.guidepost.width";
     public static final String STATS_GUIDEPOST_PER_REGION_ATTRIB = 
"phoenix.stats.guidepost.per.region";
     public static final String STATS_USE_CURRENT_TIME_ATTRIB = 
"phoenix.stats.useCurrentTime";
+    public static final String STATS_ENABLED_ATTRIB = "phoenix.stats.enabled";
     public static final String RUN_UPDATE_STATS_ASYNC = 
"phoenix.update.stats.command.async";
     public static final String STATS_SERVER_POOL_SIZE = 
"phoenix.stats.pool.size";
     public static final String COMMIT_STATS_ASYNC = 
"phoenix.stats.commit.async";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2cc1be6/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
new file mode 100644
index 0000000..96b35f1
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stats;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.TimeKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+/**
+ * A default implementation of the Statistics tracker that helps to collect 
stats like min key, max key and guideposts.
+ * TODO: review timestamps used for stats. We support the user controlling the 
timestamps, so we should honor that with
+ * timestamps for stats as well. The issue is for compaction, though. I don't 
know of a way for the user to specify any
+ * timestamp for that. Perhaps best to use current time across the board for 
now.
+ */
+class DefaultStatisticsCollector implements StatisticsCollector {
+    private static final Logger logger = 
LoggerFactory.getLogger(DefaultStatisticsCollector.class);
+
+    private long guidepostDepth;
+    private long maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
+    private Map<ImmutableBytesPtr, Pair<Long, GuidePostsInfoBuilder>> 
guidePostsInfoWriterMap = Maps.newHashMap();
+    protected StatisticsWriter statsTable;
+    private Pair<Long, GuidePostsInfoBuilder> cachedGps = null;
+
+    DefaultStatisticsCollector(RegionCoprocessorEnvironment env, String 
tableName, long clientTimeStamp, byte[] family,
+            byte[] gp_width_bytes, byte[] gp_per_region_bytes) throws 
IOException {
+        Configuration config = env.getConfiguration();
+        int guidepostPerRegion = gp_per_region_bytes == null
+                ? 
config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
+                        
QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION)
+                : PInteger.INSTANCE.getCodec().decodeInt(gp_per_region_bytes, 
0, SortOrder.getDefault());
+        long guidepostWidth = gp_width_bytes == null
+                ? 
config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+                        
QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES)
+                : PLong.INSTANCE.getCodec().decodeInt(gp_width_bytes, 0, 
SortOrder.getDefault());
+        this.guidepostDepth = 
StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth,
+                env.getRegion().getTableDesc());
+        // Provides a means of clients controlling their timestamps to not use 
current time
+        // when background tasks are updating stats. Instead we track the max 
timestamp of
+        // the cells and use that.
+        boolean useCurrentTime = env.getConfiguration().getBoolean(
+                QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+                QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
+        if (!useCurrentTime) {
+            clientTimeStamp = DefaultStatisticsCollector.NO_TIMESTAMP;
+        }
+        // Get the stats table associated with the current table on which the 
CP is
+        // triggered
+        this.statsTable = StatisticsWriter.newWriter(env, tableName, 
clientTimeStamp);
+        // in a compaction we know the one family ahead of time
+        if (family != null) {
+            ImmutableBytesPtr cfKey = new ImmutableBytesPtr(family);
+            cachedGps = new Pair<Long, GuidePostsInfoBuilder>(0l, new 
GuidePostsInfoBuilder());
+            guidePostsInfoWriterMap.put(cfKey, cachedGps);
+        }
+    }
+
+    @Override
+    public long getMaxTimeStamp() {
+        return maxTimeStamp;
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.statsTable.close();
+    }
+
+    @Override
+    public void updateStatistic(Region region) {
+        try {
+            ArrayList<Mutation> mutations = new ArrayList<Mutation>();
+            writeStatsToStatsTable(region, true, mutations, 
TimeKeeper.SYSTEM.getCurrentTime());
+            if (logger.isDebugEnabled()) {
+                logger.debug("Committing new stats for the region " + 
region.getRegionInfo());
+            }
+            commitStats(mutations);
+        } catch (IOException e) {
+            logger.error("Unable to commit new stats", e);
+        } finally {
+            clear();
+        }
+    }
+
+    private void writeStatsToStatsTable(final Region region, boolean delete, 
List<Mutation> mutations, long currentTime)
+            throws IOException {
+        try {
+            // update the statistics table
+            for (ImmutableBytesPtr fam : guidePostsInfoWriterMap.keySet()) {
+                if (delete) {
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("Deleting the stats for the region " + 
region.getRegionInfo());
+                    }
+                    statsTable.deleteStats(region, this, fam, mutations);
+                }
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Adding new stats for the region " + 
region.getRegionInfo());
+                }
+                statsTable.addStats(this, fam, mutations);
+            }
+        } catch (IOException e) {
+            logger.error("Failed to update statistics table!", e);
+            throw e;
+        }
+    }
+
+    private void commitStats(List<Mutation> mutations) throws IOException {
+        statsTable.commitStats(mutations);
+    }
+
+    /**
+     * Update the current statistics based on the latest batch of key-values 
from the underlying scanner
+     * 
+     * @param results
+     *            next batch of {@link KeyValue}s
+     */
+    @Override
+    public void collectStatistics(final List<Cell> results) {
+        Map<ImmutableBytesPtr, Boolean> famMap = Maps.newHashMap();
+        for (Cell cell : results) {
+            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+            maxTimeStamp = Math.max(maxTimeStamp, kv.getTimestamp());
+            Pair<Long, GuidePostsInfoBuilder> gps;
+            if (cachedGps == null) {
+                ImmutableBytesPtr cfKey = new 
ImmutableBytesPtr(kv.getFamilyArray(), kv.getFamilyOffset(),
+                        kv.getFamilyLength());
+                gps = guidePostsInfoWriterMap.get(cfKey);
+                if (gps == null) {
+                    gps = new Pair<Long, GuidePostsInfoBuilder>(0l,
+                            new GuidePostsInfoBuilder());
+                    guidePostsInfoWriterMap.put(cfKey, gps);
+                }
+                if (famMap.get(cfKey) == null) {
+                    famMap.put(cfKey, true);
+                    gps.getSecond().incrementRowCount();
+                }
+            } else {
+                gps = cachedGps;
+                cachedGps.getSecond().incrementRowCount();
+            }
+            int kvLength = kv.getLength();
+            long byteCount = gps.getFirst() + kvLength;
+            gps.setFirst(byteCount);
+            if (byteCount >= guidepostDepth) {
+                ImmutableBytesWritable row = new 
ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
+                if (gps.getSecond().addGuidePosts(row, byteCount, 
gps.getSecond().getRowCount())) {
+                    gps.setFirst(0l);
+                    gps.getSecond().resetRowCount();
+                }
+            }
+        }
+    }
+
+    @Override
+    public InternalScanner 
createCompactionScanner(RegionCoprocessorEnvironment env, Store store,
+            InternalScanner s) throws IOException {
+        // See if this is for Major compaction
+        if (logger.isDebugEnabled()) {
+            logger.debug("Compaction scanner created for stats");
+        }
+        ImmutableBytesPtr cfKey = new 
ImmutableBytesPtr(store.getFamily().getName());
+        return getInternalScanner(env, s, cfKey);
+    }
+
+    protected InternalScanner getInternalScanner(RegionCoprocessorEnvironment 
env, InternalScanner internalScan,
+            ImmutableBytesPtr family) {
+        return new StatisticsScanner(this, statsTable, env, internalScan, 
family);
+    }
+
+    @Override
+    public void clear() {
+        this.guidePostsInfoWriterMap.clear();
+        maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
+    }
+
+    @Override
+    public GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam) {
+        Pair<Long, GuidePostsInfoBuilder> pair = 
guidePostsInfoWriterMap.get(fam);
+        if (pair != null) { return pair.getSecond().build(); }
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2cc1be6/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
new file mode 100644
index 0000000..1063229
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stats;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * A drop-in statistics collector that does nothing. An instance of this class 
is used for tables
+ * or environments where statistics collection is disabled.
+ */
+public class NoOpStatisticsCollector implements StatisticsCollector {
+
+    @Override
+    public long getMaxTimeStamp() {
+        return NO_TIMESTAMP;
+    }
+
+    @Override
+    public void close() throws IOException {
+        // No-op
+    }
+
+    @Override
+    public void updateStatistic(Region region) {
+        // No-op
+    }
+
+    @Override
+    public void collectStatistics(List<Cell> results) {
+        // No-op
+    }
+
+    @Override
+    public InternalScanner 
createCompactionScanner(RegionCoprocessorEnvironment env, Store store,
+            InternalScanner delegate) throws IOException {
+        return delegate;
+    }
+
+    @Override public void clear() {
+        // No-op
+    }
+
+    @Override public GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam) {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2cc1be6/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
index 435fe87..f297b3b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStats.java
@@ -40,7 +40,7 @@ public interface PTableStats {
 
         @Override
         public long getTimestamp() {
-            return StatisticsCollector.NO_TIMESTAMP;
+            return DefaultStatisticsCollector.NO_TIMESTAMP;
         }
     };
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2cc1be6/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
index 185ceb8..1dcab08 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -17,208 +17,55 @@
  */
 package org.apache.phoenix.schema.stats;
 
+import java.io.Closeable;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.types.PInteger;
-import org.apache.phoenix.schema.types.PLong;
-import org.apache.phoenix.util.TimeKeeper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
 
 /**
- * A default implementation of the Statistics tracker that helps to collect 
stats like min key, max key and guideposts.
- * TODO: review timestamps used for stats. We support the user controlling the 
timestamps, so we should honor that with
- * timestamps for stats as well. The issue is for compaction, though. I don't 
know of a way for the user to specify any
- * timestamp for that. Perhaps best to use current time across the board for 
now.
+ * Statistics tracker that helps to collect stats like min key, max key and 
guideposts.
  */
-public class StatisticsCollector {
-    private static final Logger logger = 
LoggerFactory.getLogger(StatisticsCollector.class);
-    public static final long NO_TIMESTAMP = -1;
-
-    private long guidepostDepth;
-    private long maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
-    private Map<ImmutableBytesPtr, Pair<Long, GuidePostsInfoBuilder>> 
guidePostsInfoWriterMap = Maps.newHashMap();
-    protected StatisticsWriter statsTable;
-    private Pair<Long, GuidePostsInfoBuilder> cachedGps = null;
-
-    public StatisticsCollector(RegionCoprocessorEnvironment env, String 
tableName, long clientTimeStamp,
-            byte[] gp_width_bytes, byte[] gp_per_region_bytes) throws 
IOException {
-        this(env, tableName, clientTimeStamp, null, gp_width_bytes, 
gp_per_region_bytes);
-    }
-
-    public StatisticsCollector(RegionCoprocessorEnvironment env, String 
tableName, long clientTimeStamp, byte[] family)
-            throws IOException {
-        this(env, tableName, clientTimeStamp, family, null, null);
-    }
-
-    private StatisticsCollector(RegionCoprocessorEnvironment env, String 
tableName, long clientTimeStamp, byte[] family,
-            byte[] gp_width_bytes, byte[] gp_per_region_bytes) throws 
IOException {
-        Configuration config = env.getConfiguration();
-        int guidepostPerRegion = gp_per_region_bytes == null
-                ? 
config.getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
-                        
QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION)
-                : PInteger.INSTANCE.getCodec().decodeInt(gp_per_region_bytes, 
0, SortOrder.getDefault());
-        long guidepostWidth = gp_width_bytes == null
-                ? 
config.getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
-                        
QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES)
-                : PLong.INSTANCE.getCodec().decodeInt(gp_width_bytes, 0, 
SortOrder.getDefault());
-        this.guidepostDepth = 
StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth,
-                env.getRegion().getTableDesc());
-        // Provides a means of clients controlling their timestamps to not use 
current time
-        // when background tasks are updating stats. Instead we track the max 
timestamp of
-        // the cells and use that.
-        boolean useCurrentTime = env.getConfiguration().getBoolean(
-                QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
-                QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
-        if (!useCurrentTime) {
-            clientTimeStamp = StatisticsCollector.NO_TIMESTAMP;
-        }
-        // Get the stats table associated with the current table on which the 
CP is
-        // triggered
-        this.statsTable = StatisticsWriter.newWriter(env, tableName, 
clientTimeStamp);
-        // in a compaction we know the one family ahead of time
-        if (family != null) {
-            ImmutableBytesPtr cfKey = new ImmutableBytesPtr(family);
-            cachedGps = new Pair<Long, GuidePostsInfoBuilder>(0l, new 
GuidePostsInfoBuilder());
-            guidePostsInfoWriterMap.put(cfKey, cachedGps);
-        }
-    }
+public interface StatisticsCollector extends Closeable {
 
-    public long getMaxTimeStamp() {
-        return maxTimeStamp;
-    }
-
-    public void close() throws IOException {
-        this.statsTable.close();
-    }
-
-    public void updateStatistic(Region region) {
-        try {
-            ArrayList<Mutation> mutations = new ArrayList<Mutation>();
-            writeStatsToStatsTable(region, true, mutations, 
TimeKeeper.SYSTEM.getCurrentTime());
-            if (logger.isDebugEnabled()) {
-                logger.debug("Committing new stats for the region " + 
region.getRegionInfo());
-            }
-            commitStats(mutations);
-        } catch (IOException e) {
-            logger.error("Unable to commit new stats", e);
-        } finally {
-            clear();
-        }
-    }
-
-    private void writeStatsToStatsTable(final Region region, boolean delete, 
List<Mutation> mutations, long currentTime)
-            throws IOException {
-        try {
-            // update the statistics table
-            for (ImmutableBytesPtr fam : guidePostsInfoWriterMap.keySet()) {
-                if (delete) {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Deleting the stats for the region " + 
region.getRegionInfo());
-                    }
-                    statsTable.deleteStats(region, this, fam, mutations);
-                }
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Adding new stats for the region " + 
region.getRegionInfo());
-                }
-                statsTable.addStats(this, fam, mutations);
-            }
-        } catch (IOException e) {
-            logger.error("Failed to update statistics table!", e);
-            throw e;
-        }
-    }
-
-    private void commitStats(List<Mutation> mutations) throws IOException {
-        statsTable.commitStats(mutations);
-    }
+    /** Constant used if no max timestamp is available */
+    long NO_TIMESTAMP = -1;
 
     /**
-     * Update the current statistics based on the latest batch of key-values 
from the underlying scanner
-     * 
-     * @param results
-     *            next batch of {@link KeyValue}s
+     * Returns the maximum timestamp of all cells encountered while collecting 
statistics.
      */
-    public void collectStatistics(final List<Cell> results) {
-        Map<ImmutableBytesPtr, Boolean> famMap = Maps.newHashMap();
-        for (Cell cell : results) {
-            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-            maxTimeStamp = Math.max(maxTimeStamp, kv.getTimestamp());
-            Pair<Long, GuidePostsInfoBuilder> gps;
-            if (cachedGps == null) {
-                ImmutableBytesPtr cfKey = new 
ImmutableBytesPtr(kv.getFamilyArray(), kv.getFamilyOffset(),
-                        kv.getFamilyLength());
-                gps = guidePostsInfoWriterMap.get(cfKey);
-                if (gps == null) {
-                    gps = new Pair<Long, GuidePostsInfoBuilder>(0l,
-                            new GuidePostsInfoBuilder());
-                    guidePostsInfoWriterMap.put(cfKey, gps);
-                }
-                if (famMap.get(cfKey) == null) {
-                    famMap.put(cfKey, true);
-                    gps.getSecond().incrementRowCount();
-                }
-            } else {
-                gps = cachedGps;
-                cachedGps.getSecond().incrementRowCount();
-            }
-            int kvLength = kv.getLength();
-            long byteCount = gps.getFirst() + kvLength;
-            gps.setFirst(byteCount);
-            if (byteCount >= guidepostDepth) {
-                ImmutableBytesWritable row = new 
ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
-                if (gps.getSecond().addGuidePosts(row, byteCount, 
gps.getSecond().getRowCount())) {
-                    gps.setFirst(0l);
-                    gps.getSecond().resetRowCount();
-                }
-            }
-        }
-    }
+    long getMaxTimeStamp();
 
-    public InternalScanner 
createCompactionScanner(RegionCoprocessorEnvironment env, Store store, 
InternalScanner s) throws IOException {
-        // See if this is for Major compaction
-        if (logger.isDebugEnabled()) {
-            logger.debug("Compaction scanner created for stats");
-        }
-        ImmutableBytesPtr cfKey = new 
ImmutableBytesPtr(store.getFamily().getName());
-        return getInternalScanner(env, store, s, cfKey);
-    }
+    /**
+     * Write the collected statistics for the given region.
+     */
+    void updateStatistic(Region region);
 
-    protected InternalScanner getInternalScanner(RegionCoprocessorEnvironment 
env, Store store,
-            InternalScanner internalScan, ImmutableBytesPtr family) {
-        return new StatisticsScanner(this, statsTable, env, internalScan, 
family);
-    }
+    /**
+     * Collect statistics for the given list of cells. This method can be 
called multiple times
+     * during collection of statistics.
+     */
+    void collectStatistics(List<Cell> results);
 
-    public void clear() {
-        this.guidePostsInfoWriterMap.clear();
-        maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
-    }
+    /**
+     * Wrap a compaction scanner with a scanner that will collect statistics 
using this instance.
+     */
+    InternalScanner createCompactionScanner(RegionCoprocessorEnvironment env, 
Store store,
+            InternalScanner delegate) throws IOException;
 
-    public GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam) {
-        Pair<Long, GuidePostsInfoBuilder> pair = 
guidePostsInfoWriterMap.get(fam);
-        if (pair != null) { return pair.getSecond().build(); }
-        return null;
-    }
+    /**
+     * Clear all statistics information that has been collected.
+     */
+    void clear();
 
+    /**
+     * Retrieve the calculated guide post info for the given column family.
+     */
+    GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam);
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2cc1be6/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java
new file mode 100644
index 0000000..aaffd73
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stats;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.query.QueryServices;
+
+/**
+ * Provides new {@link StatisticsCollector} instances based on configuration 
settings for a
+ * table (or system-wide configuration of statistics).
+ */
+public class StatisticsCollectorFactory {
+
+    public static StatisticsCollector 
createStatisticsCollector(RegionCoprocessorEnvironment env,
+            String tableName, long clientTimestamp, byte[] guidepostWidthBytes,
+            byte[] guidepostsPerRegionBytes) throws IOException {
+        if (statisticsEnabled(env)) {
+            return new DefaultStatisticsCollector(env, tableName, 
clientTimestamp, null,
+                    guidepostWidthBytes, guidepostsPerRegionBytes);
+        } else {
+            return new NoOpStatisticsCollector();
+        }
+    }
+
+    public static StatisticsCollector createStatisticsCollector(
+            RegionCoprocessorEnvironment env, String tableName, long 
clientTimeStamp,
+            byte[] storeName) throws IOException {
+        if (statisticsEnabled(env)) {
+            return new DefaultStatisticsCollector(env, tableName, 
clientTimeStamp, storeName,
+                    null, null);
+        } else {
+            return new NoOpStatisticsCollector();
+        }
+    }
+
+    /**
+     * Determines if statistics are enabled (which is the default). This is 
done on the
+     * RegionCoprocessorEnvironment for now to allow setting this on a 
per-table basis, although
+     * it could be moved to the general table metadata in the future if there 
is a realistic
+     * use case for that.
+     */
+    private static boolean statisticsEnabled(RegionCoprocessorEnvironment env) 
{
+        return 
env.getConfiguration().getBoolean(QueryServices.STATS_ENABLED_ATTRIB, true);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2cc1be6/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index d798aa2..fb5b51c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
 /**
- * The scanner that does the scanning to collect the stats during major 
compaction.{@link StatisticsCollector}
+ * The scanner that does the scanning to collect the stats during major 
compaction.{@link DefaultStatisticsCollector}
  */
 public class StatisticsScanner implements InternalScanner {
     private static final Log LOG = LogFactory.getLog(StatisticsScanner.class);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2cc1be6/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
index dee5714..f8c888d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
@@ -79,7 +79,7 @@ public class StatisticsWriter implements Closeable {
         HTableInterface statsReaderTable = 
ServerUtil.getHTableForCoprocessorScan(env, statsWriterTable);
         StatisticsWriter statsTable = new StatisticsWriter(statsReaderTable, 
statsWriterTable, tableName,
                 clientTimeStamp);
-        if (clientTimeStamp != StatisticsCollector.NO_TIMESTAMP) { // 
Otherwise we do this later as we don't know the ts
+        if (clientTimeStamp != DefaultStatisticsCollector.NO_TIMESTAMP) { // 
Otherwise we do this later as we don't know the ts
                                                                    // yet
             statsTable.commitLastStatsUpdatedTime();
         }
@@ -131,7 +131,7 @@ public class StatisticsWriter implements Closeable {
     public void addStats(StatisticsCollector tracker, ImmutableBytesPtr cfKey, 
List<Mutation> mutations)
             throws IOException {
         if (tracker == null) { return; }
-        boolean useMaxTimeStamp = clientTimeStamp == 
StatisticsCollector.NO_TIMESTAMP;
+        boolean useMaxTimeStamp = clientTimeStamp == 
DefaultStatisticsCollector.NO_TIMESTAMP;
         long timeStamp = clientTimeStamp;
         if (useMaxTimeStamp) { // When using max timestamp, we write the 
update time later because we only know the ts
                                // now
@@ -217,7 +217,7 @@ public class StatisticsWriter implements Closeable {
 
     public void deleteStats(Region region, StatisticsCollector tracker, 
ImmutableBytesPtr fam, List<Mutation> mutations)
             throws IOException {
-        long timeStamp = clientTimeStamp == StatisticsCollector.NO_TIMESTAMP ? 
tracker.getMaxTimeStamp()
+        long timeStamp = clientTimeStamp == 
DefaultStatisticsCollector.NO_TIMESTAMP ? tracker.getMaxTimeStamp()
                 : clientTimeStamp;
         List<Result> statsForRegion = 
StatisticsUtil.readStatistics(statsWriterTable, tableName, fam,
                 region.getRegionInfo().getStartKey(), 
region.getRegionInfo().getEndKey(), timeStamp);

Reply via email to