Repository: phoenix Updated Branches: refs/heads/4.0 763f10f00 -> 5668817de
PHOENIX-1321 Cleanup setting of timestamps when collecting and using stats Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5668817d Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5668817d Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5668817d Branch: refs/heads/4.0 Commit: 5668817dea05ea105f26648176be3f124a4157f2 Parents: 763f10f Author: James Taylor <[email protected]> Authored: Mon Oct 6 16:02:37 2014 -0700 Committer: James Taylor <[email protected]> Committed: Mon Oct 6 16:02:37 2014 -0700 ---------------------------------------------------------------------- .../end2end/BaseTenantSpecificTablesIT.java | 2 +- .../org/apache/phoenix/end2end/KeyOnlyIT.java | 33 ++++-- .../phoenix/end2end/MultiCfQueryExecIT.java | 27 +++-- .../phoenix/end2end/ParallelIteratorsIT.java | 2 +- .../phoenix/end2end/StatsCollectorIT.java | 2 +- .../phoenix/end2end/index/SaltedIndexIT.java | 2 +- .../phoenix/mapreduce/CsvBulkLoadToolIT.java | 22 +++- .../coprocessor/MetaDataEndpointImpl.java | 105 +++-------------- .../UngroupedAggregateRegionObserver.java | 16 ++- .../org/apache/phoenix/query/QueryServices.java | 8 +- .../phoenix/query/QueryServicesOptions.java | 28 +++-- .../apache/phoenix/schema/MetaDataClient.java | 2 +- .../schema/stat/StatisticsCollector.java | 27 +++-- .../phoenix/schema/stat/StatisticsScanner.java | 8 +- .../phoenix/schema/stat/StatisticsTable.java | 90 +++++++-------- .../phoenix/schema/stat/StatisticsUtils.java | 115 ++++++++++++++----- .../org/apache/phoenix/util/MetaDataUtil.java | 12 ++ .../java/org/apache/phoenix/query/BaseTest.java | 2 +- .../phoenix/query/QueryServicesTestImpl.java | 8 +- .../java/org/apache/phoenix/util/TestUtil.java | 13 +++ 20 files changed, 285 insertions(+), 239 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java index bcae7ed..b8fa035 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificTablesIT.java @@ -84,7 +84,7 @@ public abstract class BaseTenantSpecificTablesIT extends BaseClientManagedTimeIT public static void doSetup() throws Exception { Map<String,String> props = Maps.newHashMapWithExpectedSize(3); // Must update config before starting server - props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l)); + props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20l)); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); PHOENIX_JDBC_TENANT_SPECIFIC_URL = getUrl() + ';' + TENANT_ID_ATTRIB + '=' + TENANT_ID; PHOENIX_JDBC_TENANT_SPECIFIC_URL2 = getUrl() + ';' + TENANT_ID_ATTRIB + '=' + TENANT_ID2; http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java index 4dee5d8..9b26c2e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/KeyOnlyIT.java @@ -52,7 +52,7 @@ public class KeyOnlyIT extends BaseClientManagedTimeIT { public static void doSetup() throws Exception { Map<String,String> props = Maps.newHashMapWithExpectedSize(3); // Must update config before starting server - props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l)); + props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20)); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } @Test @@ -60,11 +60,14 @@ public class KeyOnlyIT extends BaseClientManagedTimeIT { long ts = nextTimestamp(); ensureTableCreated(getUrl(),KEYONLY_NAME,null, ts); initTableValues(ts+1); - Properties props = new Properties(); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+30)); + Connection conn3 = DriverManager.getConnection(getUrl(), props); + analyzeTable(conn3, KEYONLY_NAME); + conn3.close(); - props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+5)); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+50)); Connection conn5 = DriverManager.getConnection(getUrl(), props); - analyzeTable(conn5, KEYONLY_NAME); String query = "SELECT i1, i2 FROM KEYONLY"; PreparedStatement statement = conn5.prepareStatement(query); ResultSet rs = statement.executeQuery(); @@ -79,12 +82,12 @@ public class KeyOnlyIT extends BaseClientManagedTimeIT { assertEquals(3, splits.size()); conn5.close(); - props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+6)); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+60)); Connection conn6 = DriverManager.getConnection(getUrl(), props); conn6.createStatement().execute("ALTER TABLE KEYONLY ADD s1 varchar"); conn6.close(); - props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+7)); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+70)); Connection conn7 = DriverManager.getConnection(getUrl(), props); PreparedStatement stmt = conn7.prepareStatement( "upsert into " + @@ -96,11 +99,15 @@ public class KeyOnlyIT extends BaseClientManagedTimeIT { conn7.commit(); conn7.close(); - props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+8)); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+80)); Connection conn8 = DriverManager.getConnection(getUrl(), props); analyzeTable(conn8, KEYONLY_NAME); + conn8.close(); + + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+90)); + Connection conn9 = DriverManager.getConnection(getUrl(), props); query = "SELECT i1 FROM KEYONLY"; - statement = conn8.prepareStatement(query); + statement = conn9.prepareStatement(query); rs = statement.executeQuery(); assertTrue(rs.next()); assertEquals(1, rs.getInt(1)); @@ -111,7 +118,7 @@ public class KeyOnlyIT extends BaseClientManagedTimeIT { assertFalse(rs.next()); query = "SELECT i1,s1 FROM KEYONLY"; - statement = conn8.prepareStatement(query); + statement = conn9.prepareStatement(query); rs = statement.executeQuery(); assertTrue(rs.next()); assertEquals(1, rs.getInt(1)); @@ -124,7 +131,7 @@ public class KeyOnlyIT extends BaseClientManagedTimeIT { assertEquals("foo", rs.getString(2)); assertFalse(rs.next()); - conn8.close(); + conn9.close(); } @Test @@ -134,9 +141,13 @@ public class KeyOnlyIT extends BaseClientManagedTimeIT { initTableValues(ts+1); Properties props = new Properties(); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+3)); + Connection conn3 = DriverManager.getConnection(getUrl(), props); + analyzeTable(conn3, KEYONLY_NAME); + conn3.close(); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+5)); Connection conn5 = DriverManager.getConnection(getUrl(), props); - analyzeTable(conn5, KEYONLY_NAME); String query = "SELECT i1 FROM KEYONLY WHERE i1 < 2 or i1 = 3"; PreparedStatement statement = conn5.prepareStatement(query); ResultSet rs = statement.executeQuery(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java index 9f313ae..f4665b2 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiCfQueryExecIT.java @@ -53,7 +53,7 @@ public class MultiCfQueryExecIT extends BaseClientManagedTimeIT { public static void doSetup() throws Exception { Map<String,String> props = Maps.newHashMapWithExpectedSize(3); // Must update config before starting server - props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l)); + props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20l)); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } protected static void initTableValues(long ts) throws Exception { @@ -102,7 +102,7 @@ public class MultiCfQueryExecIT extends BaseClientManagedTimeIT { Connection conn = DriverManager.getConnection(url, props); try { initTableValues(ts); - analyzeTable(conn, "MULTI_CF"); + analyzeTable(getUrl(), ts + 3, "MULTI_CF"); PreparedStatement statement = conn.prepareStatement(query); ResultSet rs = statement.executeQuery(); assertTrue(rs.next()); @@ -122,7 +122,7 @@ public class MultiCfQueryExecIT extends BaseClientManagedTimeIT { Connection conn = DriverManager.getConnection(url, props); try { initTableValues(ts); - analyzeTable(conn, "MULTI_CF"); + analyzeTable(getUrl(), ts + 3, "MULTI_CF"); PreparedStatement statement = conn.prepareStatement(query); ResultSet rs = statement.executeQuery(); assertTrue(rs.next()); @@ -143,7 +143,7 @@ public class MultiCfQueryExecIT extends BaseClientManagedTimeIT { Connection conn = DriverManager.getConnection(url, props); try { initTableValues(ts); - analyzeTable(conn, "MULTI_CF"); + analyzeTable(getUrl(), ts + 3, "MULTI_CF"); PreparedStatement statement = conn.prepareStatement(query); ResultSet rs = statement.executeQuery(); assertTrue(rs.next()); @@ -164,7 +164,7 @@ public class MultiCfQueryExecIT extends BaseClientManagedTimeIT { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(url, props); try { - analyzeTable(conn, "MULTI_CF"); + analyzeTable(getUrl(), ts + 3, "MULTI_CF"); PreparedStatement statement = conn.prepareStatement(query); ResultSet rs = statement.executeQuery(); assertTrue(rs.next()); @@ -193,7 +193,7 @@ public class MultiCfQueryExecIT extends BaseClientManagedTimeIT { Connection conn = DriverManager.getConnection(url, props); try { initTableValues(ts); - analyzeTable(conn, "MULTI_CF"); + analyzeTable(getUrl(), ts + 3, "MULTI_CF"); PreparedStatement statement = conn.prepareStatement(query); ResultSet rs = statement.executeQuery(); assertTrue(rs.next()); @@ -210,25 +210,28 @@ public class MultiCfQueryExecIT extends BaseClientManagedTimeIT { long ts = nextTimestamp(); initTableValues(ts); String ddl = "ALTER TABLE multi_cf ADD response_time BIGINT"; - String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 3); + String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 10); Connection conn = DriverManager.getConnection(url); conn.createStatement().execute(ddl); - analyzeTable(conn, "MULTI_CF"); conn.close(); + + analyzeTable(getUrl(), ts + 15, "MULTI_CF"); String dml = "upsert into " + "MULTI_CF(" + " ID, " + " RESPONSE_TIME)" + "VALUES ('000000000000003', 333)"; - url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 4); // Run query at timestamp 5 + url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 20); conn = DriverManager.getConnection(url); conn.createStatement().execute(dml); conn.commit(); conn.close(); - analyzeTable(conn, "MULTI_CF"); + + analyzeTable(getUrl(), ts + 25, "MULTI_CF"); + String query = "SELECT ID,RESPONSE_TIME from multi_cf where RESPONSE_TIME = 333"; - url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 5); // Run query at timestamp 5 + url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + (ts + 30); // Run query at timestamp 5 conn = DriverManager.getConnection(url); try { PreparedStatement statement = conn.prepareStatement(query); @@ -251,7 +254,7 @@ public class MultiCfQueryExecIT extends BaseClientManagedTimeIT { Connection conn = DriverManager.getConnection(url, props); try { initTableValues(ts); - analyzeTable(conn, "MULTI_CF"); + analyzeTable(getUrl(), ts + 3, "MULTI_CF"); PreparedStatement statement = conn.prepareStatement(query); ResultSet rs = statement.executeQuery(); assertTrue(rs.next()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelIteratorsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelIteratorsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelIteratorsIT.java index e48a938..5a1be0b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelIteratorsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelIteratorsIT.java @@ -63,7 +63,7 @@ public class ParallelIteratorsIT extends BaseHBaseManagedTimeIT { public static void doSetup() throws Exception { Map<String,String> props = Maps.newHashMapWithExpectedSize(3); // Must update config before starting server - props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20)); + props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20)); props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java index e20c11f..3c0d683 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorIT.java @@ -32,7 +32,7 @@ public class StatsCollectorIT extends BaseHBaseManagedTimeIT { public static void doSetup() throws Exception { Map<String,String> props = Maps.newHashMapWithExpectedSize(3); // Must update config before starting server - props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(20l)); + props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20l)); props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Integer.toString(frequency)); props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(20)); props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(20)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java index 8f7912a..f72cc3c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java @@ -58,7 +58,7 @@ public class SaltedIndexIT extends BaseIndexIT { // Drop the HBase table metadata for this test props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); // Don't put guideposts in - props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(10000000)); + props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(10000000)); // Must update config before starting server setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java index d4a80a2..4373f47 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java @@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue; import java.io.PrintWriter; import java.sql.Connection; import java.sql.DriverManager; -import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.Statement; @@ -67,10 +66,23 @@ public class CsvBulkLoadToolIT { @AfterClass public static void tearDownAfterClass() throws Exception { - conn.close(); - PhoenixDriver.INSTANCE.close(); - hbaseTestUtil.shutdownMiniMapReduceCluster(); - hbaseTestUtil.shutdownMiniCluster(); + try { + conn.close(); + } finally { + try { + PhoenixDriver.INSTANCE.close(); + } finally { + try { + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + } finally { + try { + hbaseTestUtil.shutdownMiniMapReduceCluster(); + } finally { + hbaseTestUtil.shutdownMiniCluster(); + } + } + } + } } @Test http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 6a4f69b..4fe9a2b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -60,12 +60,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; @@ -131,11 +129,10 @@ import org.apache.phoenix.schema.PTable.LinkType; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; -import org.apache.phoenix.schema.PhoenixArray; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.stat.PTableStats; -import org.apache.phoenix.schema.stat.PTableStatsImpl; +import org.apache.phoenix.schema.stat.StatisticsUtils; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.ByteUtil; @@ -274,22 +271,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return PNameFactory.newName(keyBuffer, keyOffset, length); } - private static Scan newTableRowsScan(byte[] key) - throws IOException { - return newTableRowsScan(key, MetaDataProtocol.MIN_TABLE_TIMESTAMP, HConstants.LATEST_TIMESTAMP); - } - - private static Scan newTableRowsScan(byte[] key, long startTimeStamp, long stopTimeStamp) - throws IOException { - Scan scan = new Scan(); - scan.setTimeRange(startTimeStamp, stopTimeStamp); - scan.setStartRow(key); - byte[] stopKey = ByteUtil.concat(key, QueryConstants.SEPARATOR_BYTE_ARRAY); - ByteUtil.nextKey(stopKey, stopKey.length); - scan.setStopRow(stopKey); - return scan; - } - private RegionCoprocessorEnvironment env; private static final Log LOG = LogFactory.getLog(MetaDataEndpointImpl.class); @@ -417,20 +398,20 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return table; } // Query for the latest table first, since it's not cached - table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP); + table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientTimeStamp); if (table != null && table.getTimeStamp() < clientTimeStamp) { return table; } // Otherwise, query for an older version of the table - it won't be cached - return buildTable(key, cacheKey, region, clientTimeStamp); + return buildTable(key, cacheKey, region, clientTimeStamp, clientTimeStamp); } finally { rowLock.release(); } } private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, - long clientTimeStamp) throws IOException, SQLException { - Scan scan = newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp); + long buildAsOfTimeStamp, long clientTimeStamp) throws IOException, SQLException { + Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, buildAsOfTimeStamp); RegionScanner scanner = region.getScanner(scan); Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); @@ -438,7 +419,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso PTable oldTable = metaDataCache.getIfPresent(cacheKey); long tableTimeStamp = oldTable == null ? MIN_TABLE_TIMESTAMP-1 : oldTable.getTimeStamp(); PTable newTable; - newTable = getTable(scanner, clientTimeStamp, tableTimeStamp); + newTable = getTable(scanner, buildAsOfTimeStamp, tableTimeStamp, clientTimeStamp); if (newTable == null) { return null; } @@ -536,7 +517,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso columns.add(column); } - private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp) + private PTable getTable(RegionScanner scanner, long buildAsOfTimeStamp, long tableTimeStamp, long clientTimeStamp) throws IOException, SQLException { List<Cell> results = Lists.newArrayList(); scanner.next(results); @@ -671,7 +652,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if (colName.getString().isEmpty() && famName != null) { LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]); if (linkType == LinkType.INDEX_TABLE) { - addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes); + addIndexToTable(tenantId, schemaName, famName, tableName, buildAsOfTimeStamp, indexes); } else if (linkType == LinkType.PHYSICAL_TABLE) { physicalTables.add(famName); } else { @@ -683,75 +664,23 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } PName physicalTableName = physicalTables.isEmpty() ? PNameFactory.newName(SchemaUtil.getTableName( schemaName.getString(), tableName.getString())) : physicalTables.get(0); - PTableStats stats = tenantId == null ? updateStatsInternal(physicalTableName.getBytes()) : null; + PTableStats stats = tenantId == null ? StatisticsUtils.readStatistics( + ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME), + physicalTableName.getBytes(), + clientTimeStamp) : null; return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, disableWAL, multiTenant, viewType, viewIndexId, indexType, stats); } - private PTableStats updateStatsInternal(byte[] tableNameBytes) throws IOException { - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - HTableInterface statsHTable = ServerUtil.getHTableForCoprocessorScan(env, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME); - try { - Scan s = newTableRowsScan(tableNameBytes); - s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES); - ResultScanner scanner = statsHTable.getScanner(s); - Result result = null; - TreeMap<byte[], List<byte[]>> guidePostsPerCf = new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR); - while ((result = scanner.next()) != null) { - CellScanner cellScanner = result.cellScanner(); - while (cellScanner.advance()) { - Cell current = cellScanner.current(); - int tableNameLength = tableNameBytes.length + 1; - int cfOffset = current.getRowOffset() + tableNameLength; - int cfLength = getVarCharLength(current.getRowArray(), cfOffset, current.getRowLength() - tableNameLength); - ptr.set(current.getRowArray(), cfOffset, cfLength); - byte[] cfName = ByteUtil.copyKeyBytesIfNecessary(ptr); - PhoenixArray array = (PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(current.getValueArray(), current.getValueOffset(), current - .getValueLength()); - if (array != null && array.getDimensions() != 0) { - List<byte[]> guidePosts = Lists.newArrayListWithExpectedSize(array.getDimensions()); - for (int j = 0; j < array.getDimensions(); j++) { - byte[] gp = array.toBytes(j); - if (gp.length != 0) { - guidePosts.add(gp); - } - } - List<byte[]> gps = guidePostsPerCf.put(cfName, guidePosts); - if (gps != null) { // Add guidepost already there from other regions - guidePosts.addAll(gps); - } - } - } - } - if (!guidePostsPerCf.isEmpty()) { - // Sort guideposts, as the order above will depend on the order we traverse - // each region's worth of guideposts above. - for (List<byte[]> gps : guidePostsPerCf.values()) { - Collections.sort(gps, Bytes.BYTES_COMPARATOR); - } - return new PTableStatsImpl(guidePostsPerCf); - } - } catch (Exception e) { - if (e instanceof org.apache.hadoop.hbase.TableNotFoundException) { - logger.warn("Stats table not yet online", e); - } else { - throw new IOException(e); - } - } finally { - statsHTable.close(); - } - return PTableStatsImpl.NO_STATS; - } - private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, long clientTimeStamp) throws IOException { if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) { return null; } - Scan scan = newTableRowsScan(key, clientTimeStamp, HConstants.LATEST_TIMESTAMP); + Scan scan = MetaDataUtil.newTableRowsScan(key, clientTimeStamp, HConstants.LATEST_TIMESTAMP); scan.setFilter(new FirstKeyOnlyFilter()); scan.setRaw(true); RegionScanner scanner = region.getScanner(scan); @@ -786,7 +715,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache(); PTable table = metaDataCache.getIfPresent(cacheKey); // We always cache the latest version - fault in if not in cache - if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp)) != null) { + if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp, clientTimeStamp)) != null) { return table; } // if not found then check if newer table already exists and add delete marker for timestamp @@ -1086,7 +1015,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // We always cache the latest version - fault in if not in cache if (table != null - || (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) != null) { + || (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientTimeStamp)) != null) { if (table.getTimeStamp() < clientTimeStamp) { if (isTableDeleted(table) || tableType != table.getType()) { return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null); @@ -1111,7 +1040,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // Since we don't allow back in time DDL, we know if we have a table it's the one // we want to delete. FIXME: we shouldn't need a scan here, but should be able to // use the table to generate the Delete markers. - Scan scan = newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp); + Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp); RegionScanner scanner = region.getScanner(scan); List<Cell> results = Lists.newArrayList(); scanner.next(results); @@ -1243,7 +1172,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // Get client timeStamp from mutations long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata); if (table == null - && (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) == null) { + && (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientTimeStamp)) == null) { // if not found then call newerTableExists and add delete marker for timestamp // found if (buildDeletedTable(key, cacheKey, region, clientTimeStamp) != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 4ddb322..f7b5889 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -89,6 +89,7 @@ import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TimeKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -155,10 +156,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException { int offset = 0; HRegion region = c.getEnvironment().getRegion(); + long ts = scan.getTimeRange().getMax(); StatisticsCollector stats = null; if(ScanUtil.isAnalyzeTable(scan)) { // Let this throw, as this scan is being done for the sole purpose of collecting stats - stats = new StatisticsCollector(c.getEnvironment(), region.getRegionInfo().getTable().getNameAsString()); + stats = new StatisticsCollector(c.getEnvironment(), region.getRegionInfo().getTable().getNameAsString(), ts); } if (ScanUtil.isLocalIndex(scan)) { /* @@ -225,7 +227,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ innerScanner = theScanner; int batchSize = 0; - long ts = scan.getTimeRange().getMax(); List<Mutation> mutations = Collections.emptyList(); boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan; if (isDelete || isUpsert || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) { @@ -457,8 +458,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME) && scanType.equals(ScanType.COMPACT_DROP_DELETES)) { try { - // TODO: when does this get closed? - StatisticsCollector stats = new StatisticsCollector(c.getEnvironment(), table.getNameAsString()); + // TODO: for users that manage timestamps themselves, we should provide + // a means of specifying/getting this. + long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime(); + StatisticsCollector stats = new StatisticsCollector(c.getEnvironment(), table.getNameAsString(), clientTimeStamp); internalScan = stats.createCompactionScanner(c.getEnvironment().getRegion(), store, scanners, scanType, earliestPutTs, s); } catch (IOException e) { @@ -481,7 +484,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ if (!table.getNameAsString().equals(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME)) { StatisticsCollector stats = null; try { - stats = new StatisticsCollector(e.getEnvironment(), table.getNameAsString()); + // TODO: for users that manage timestamps themselves, we should provide + // a means of specifying/getting this. + long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime(); + stats = new StatisticsCollector(e.getEnvironment(), table.getNameAsString(), clientTimeStamp); stats.collectStatsDuringSplit(e.getEnvironment().getConfiguration(), l, r, region); } catch (IOException ioe) { if(logger.isWarnEnabled()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index cc2c7a3..dc92183 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -70,7 +70,6 @@ public interface QueryServices extends SQLCloseable { public static final String MAX_SERVER_CACHE_SIZE_ATTRIB = "phoenix.query.maxServerCacheBytes"; public static final String DATE_FORMAT_ATTRIB = "phoenix.query.dateFormat"; public static final String NUMBER_FORMAT_ATTRIB = "phoenix.query.numberFormat"; - public static final String STATS_UPDATE_FREQ_MS_ATTRIB = "phoenix.query.statsUpdateFrequency"; public static final String CALL_QUEUE_ROUND_ROBIN_ATTRIB = "ipc.server.callqueue.roundrobin"; public static final String SCAN_CACHE_SIZE_ATTRIB = "hbase.client.scanner.caching"; public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize"; @@ -123,7 +122,7 @@ public interface QueryServices extends SQLCloseable { public static final String MAX_INDEX_PRIOIRTY_ATTRIB = "phoenix.regionserver.index.priority.max"; public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.regionserver.index.handler.count"; - // Constants for for configuring tracing + // Config parameters for for configuring tracing public static final String TRACING_FREQ_ATTRIB = "phoenix.trace.frequency"; public static final String TRACING_PAGE_SIZE_ATTRIB = "phoenix.trace.read.pagesize"; public static final String TRACING_PROBABILITY_THRESHOLD_ATTRIB = "phoenix.trace.probability.threshold"; @@ -132,7 +131,10 @@ public interface QueryServices extends SQLCloseable { public static final String USE_REVERSE_SCAN_ATTRIB = "phoenix.query.useReverseScan"; - public static final String HISTOGRAM_BYTE_DEPTH_ATTRIB = "phoenix.guidepost.width"; + // Config parameters for stats collection + public static final String STATS_UPDATE_FREQ_MS_ATTRIB = "phoenix.stats.updateFrequency"; + public static final String MIN_STATS_UPDATE_FREQ_MS_ATTRIB = "phoenix.stats.minUpdateFrequency"; + public static final String STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB = "phoenix.stats.guidepost.width"; /** * Get executor service used for parallel scans http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index e3fefa8..06ea1d1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -24,7 +24,6 @@ import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB; import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB; import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB; -import static org.apache.phoenix.query.QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB; import static org.apache.phoenix.query.QueryServices.IMMUTABLE_ROWS_ATTRIB; import static org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB; import static org.apache.phoenix.query.QueryServices.KEEP_ALIVE_MS_ATTRIB; @@ -38,6 +37,7 @@ import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_TIME_TO_LI import static org.apache.phoenix.query.QueryServices.MAX_SERVER_METADATA_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB; +import static org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.QUEUE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.REGIONSERVER_INFO_PORT_ATTRIB; @@ -49,6 +49,7 @@ import static org.apache.phoenix.query.QueryServices.SCAN_RESULT_CHUNK_SIZE; import static org.apache.phoenix.query.QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.SPOOL_DIRECTORY; import static org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB; +import static org.apache.phoenix.query.QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB; @@ -84,8 +85,6 @@ public class QueryServicesOptions { public static final int DEFAULT_TARGET_QUERY_CONCURRENCY = 32; public static final int DEFAULT_MAX_QUERY_CONCURRENCY = 64; public static final String DEFAULT_DATE_FORMAT = DateUtil.DEFAULT_DATE_FORMAT; - public static final int DEFAULT_STATS_UPDATE_FREQ_MS = 15 * 60000; // 15min - public static final int DEFAULT_MAX_STATS_AGE_MS = 24 * 60 * 60000; // 1 day public static final boolean DEFAULT_CALL_QUEUE_ROUND_ROBIN = true; public static final int DEFAULT_MAX_MUTATION_SIZE = 500000; public static final boolean DEFAULT_ROW_KEY_ORDER_SALTED_TABLE = true; // Merge sort on client to ensure salted tables are row key ordered @@ -144,7 +143,10 @@ public class QueryServicesOptions { public static final String DEFAULT_TRACING_STATS_TABLE_NAME = "SYSTEM.TRACING_STATS"; public static final String DEFAULT_TRACING_FREQ = Tracing.Frequency.NEVER.getKey(); public static final double DEFAULT_TRACING_PROBABILITY_THRESHOLD = 0.05; - public static final long DEFAULT_HISTOGRAM_BYTE_DEPTH = 1024 * 1024 * 30; + + public static final long DEFAULT_STATS_HISTOGRAM_DEPTH_BYTE = 1024 * 1024 * 30; + public static final int DEFAULT_STATS_UPDATE_FREQ_MS = 15 * 60000; // 15min + public static final int DEFAULT_MIN_STATS_UPDATE_FREQ_MS = DEFAULT_STATS_UPDATE_FREQ_MS/2; public static final boolean DEFAULT_USE_REVERSE_SCAN = true; @@ -201,7 +203,7 @@ public class QueryServicesOptions { .setIfUnset(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES) .setIfUnset(SEQUENCE_CACHE_SIZE_ATTRIB, DEFAULT_SEQUENCE_CACHE_SIZE) .setIfUnset(SCAN_RESULT_CHUNK_SIZE, DEFAULT_SCAN_RESULT_CHUNK_SIZE) - .setIfUnset(HISTOGRAM_BYTE_DEPTH_ATTRIB, DEFAULT_HISTOGRAM_BYTE_DEPTH); + .setIfUnset(STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, DEFAULT_STATS_HISTOGRAM_DEPTH_BYTE); ; // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set @@ -296,10 +298,6 @@ public class QueryServicesOptions { return set(DATE_FORMAT_ATTRIB, dateFormat); } - public QueryServicesOptions setStatsUpdateFrequencyMs(int frequencyMs) { - return set(STATS_UPDATE_FREQ_MS_ATTRIB, frequencyMs); - } - public QueryServicesOptions setCallQueueRoundRobin(boolean isRoundRobin) { return set(CALL_QUEUE_PRODUCER_ATTRIB_NAME, isRoundRobin); } @@ -437,7 +435,15 @@ public class QueryServicesOptions { return set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, walEditCodec); } - public QueryServicesOptions setHistogramByteDepth(long byteDepth) { - return set(HISTOGRAM_BYTE_DEPTH_ATTRIB, byteDepth); + public QueryServicesOptions setStatsHistogramDepthBytes(long byteDepth) { + return set(STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, byteDepth); + } + + public QueryServicesOptions setStatsUpdateFrequencyMs(int frequencyMs) { + return set(STATS_UPDATE_FREQ_MS_ATTRIB, frequencyMs); } + + public QueryServicesOptions setMinStatsUpdateFrequencyMs(int frequencyMs) { + return set(MIN_STATS_UPDATE_FREQ_MS_ATTRIB, frequencyMs); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 469f8fe..c4bfebb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -481,7 +481,7 @@ public class MetaDataClient { public MutationState updateStatistics(UpdateStatisticsStatement updateStatisticsStmt) throws SQLException { // Check before updating the stats if we have reached the configured time to reupdate the stats once again long msMinBetweenUpdates = connection.getQueryServices().getProps() - .getLong(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS); + .getLong(QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB, QueryServicesOptions.DEFAULT_MIN_STATS_UPDATE_FREQ_MS); ColumnResolver resolver = FromCompiler.getResolver(updateStatisticsStmt, connection); PTable table = resolver.getTables().get(0).getTable(); PName physicalName = table.getPhysicalName(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java index bb05a32..0724d6a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsCollector.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -71,15 +72,14 @@ public class StatisticsCollector { // Ensures that either analyze or compaction happens at any point of time. private static final Log LOG = LogFactory.getLog(StatisticsCollector.class); - public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName) throws IOException { + public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp) throws IOException { guidepostDepth = - env.getConfiguration().getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, - QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH); + env.getConfiguration().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_HISTOGRAM_DEPTH_BYTE); // Get the stats table associated with the current table on which the CP is // triggered - this.statsTable = StatisticsTable.getStatisticsTable( - env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES))); - this.statsTable.commitLastStatsUpdatedTime(tableName, TimeKeeper.SYSTEM.getCurrentTime()); + HTableInterface statsHTable = env.getTable(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES)); + this.statsTable = StatisticsTable.getStatisticsTable(statsHTable, tableName, clientTimeStamp); } public void close() throws IOException { @@ -106,19 +106,18 @@ public class StatisticsCollector { try { // update the statistics table for (ImmutableBytesPtr fam : familyMap.keySet()) { - String tableName = region.getRegionInfo().getTable().getNameAsString(); if (delete) { if(LOG.isDebugEnabled()) { LOG.debug("Deleting the stats for the region "+region.getRegionInfo()); } - statsTable.deleteStats(tableName, region.getRegionInfo().getRegionNameAsString(), this, - Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime); + statsTable.deleteStats(region.getRegionInfo().getRegionNameAsString(), this, Bytes.toString(fam.copyBytesIfNecessary()), + mutations); } if(LOG.isDebugEnabled()) { LOG.debug("Adding new stats for the region "+region.getRegionInfo()); } - statsTable.addStats(tableName, (region.getRegionInfo().getRegionNameAsString()), this, - Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime); + statsTable.addStats((region.getRegionInfo().getRegionNameAsString()), this, Bytes.toString(fam.copyBytesIfNecessary()), + mutations); } } catch (IOException e) { LOG.error("Failed to update statistics table!", e); @@ -132,12 +131,11 @@ public class StatisticsCollector { private void deleteStatsFromStatsTable(final HRegion region, List<Mutation> mutations, long currentTime) throws IOException { try { - String tableName = region.getRegionInfo().getTable().getNameAsString(); String regionName = region.getRegionInfo().getRegionNameAsString(); // update the statistics table for (ImmutableBytesPtr fam : familyMap.keySet()) { - statsTable.deleteStats(tableName, regionName, this, - Bytes.toString(fam.copyBytesIfNecessary()), mutations, currentTime); + statsTable.deleteStats(regionName, this, Bytes.toString(fam.copyBytesIfNecessary()), + mutations); } } catch (IOException e) { LOG.error("Failed to delete from statistics table!", e); @@ -274,6 +272,7 @@ public class StatisticsCollector { } public void updateStatistic(KeyValue kv) { + @SuppressWarnings("deprecation") byte[] cf = kv.getFamily(); familyMap.put(new ImmutableBytesPtr(cf), true); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java index 79f64fc..ce3d47b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsScanner.java @@ -21,7 +21,6 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TimeKeeper; /** @@ -81,21 +80,18 @@ public class StatisticsScanner implements InternalScanner { try { // update the statistics table // Just verify if this if fine - String tableName = SchemaUtil.getTableNameFromFullName(region.getRegionInfo().getTable().getNameAsString()); ArrayList<Mutation> mutations = new ArrayList<Mutation>(); long currentTime = TimeKeeper.SYSTEM.getCurrentTime(); if (LOG.isDebugEnabled()) { LOG.debug("Deleting the stats for the region " + region.getRegionNameAsString() + " as part of major compaction"); } - stats.deleteStats(tableName, region.getRegionNameAsString(), this.tracker, Bytes.toString(family), - mutations, currentTime); + stats.deleteStats(region.getRegionNameAsString(), this.tracker, Bytes.toString(family), mutations); if (LOG.isDebugEnabled()) { LOG.debug("Adding new stats for the region " + region.getRegionNameAsString() + " as part of major compaction"); } - stats.addStats(tableName, region.getRegionNameAsString(), this.tracker, Bytes.toString(family), mutations, - currentTime); + stats.addStats(region.getRegionNameAsString(), this.tracker, Bytes.toString(family), mutations); if (LOG.isDebugEnabled()) { LOG.debug("Committing new stats for the region " + region.getRegionNameAsString() + " as part of major compaction"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java index ebaa978..fba8df7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsTable.java @@ -23,6 +23,7 @@ import java.sql.Date; import java.util.List; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; @@ -36,7 +37,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.util.ByteUtil; -import org.apache.phoenix.util.TrustedByteArrayOutputStream; +import org.apache.phoenix.util.TimeKeeper; import com.google.protobuf.ServiceException; @@ -45,6 +46,8 @@ import com.google.protobuf.ServiceException; */ public class StatisticsTable implements Closeable { /** + * @param tableName TODO + * @param clientTimeStamp TODO * @param Configuration * Configruation to update the stats table. * @param primaryTableName @@ -53,14 +56,23 @@ public class StatisticsTable implements Closeable { * @throws IOException * if the table cannot be created due to an underlying HTable creation error */ - public static StatisticsTable getStatisticsTable(HTableInterface hTable) throws IOException { - return new StatisticsTable(hTable); + public static StatisticsTable getStatisticsTable(HTableInterface hTable, String tableName, long clientTimeStamp) throws IOException { + if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) { + clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime(); + } + StatisticsTable statsTable = new StatisticsTable(hTable, tableName, clientTimeStamp); + statsTable.commitLastStatsUpdatedTime(); + return statsTable; } private final HTableInterface statisticsTable; + private final byte[] tableName; + private final long clientTimeStamp; - public StatisticsTable(HTableInterface statsTable) { + private StatisticsTable(HTableInterface statsTable, String tableName, long clientTimeStamp) { this.statisticsTable = statsTable; + this.tableName = PDataType.VARCHAR.toBytes(tableName); + this.clientTimeStamp = clientTimeStamp; } /** @@ -75,28 +87,35 @@ public class StatisticsTable implements Closeable { * Update a list of statistics for a given region. If the ANALYZE <tablename> query is issued * then we use Upsert queries to update the table * If the region gets splitted or the major compaction happens we update using HTable.put() + * @param tracker - the statistics tracker + * @param fam - the family for which the stats is getting collected. + * @param mutations - list of mutations that collects all the mutations to commit in a batch * @param tablekey - The table name * @param schemaName - the schema name associated with the table * @param region name - the region of the table for which the stats are collected - * @param tracker - the statistics tracker - * @param fam - the family for which the stats is getting collected. * @param split - if the updation is caused due to a split - * @param mutations - list of mutations that collects all the mutations to commit in a batch - * @param currentTime - the current time * @throws IOException * if we fail to do any of the puts. Any single failure will prevent any future attempts for the remaining list of stats to * update */ - public void addStats(String tableName, String regionName, StatisticsCollector tracker, String fam, - List<Mutation> mutations, long currentTime) throws IOException { + public void addStats(String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException { if (tracker == null) { return; } - // Add the timestamp header - commitLastStatsUpdatedTime(tableName, currentTime); - - byte[] prefix = StatisticsUtils.getRowKey(PDataType.VARCHAR.toBytes(tableName), PDataType.VARCHAR.toBytes(fam), + byte[] prefix = StatisticsUtils.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam), PDataType.VARCHAR.toBytes(regionName)); - formStatsUpdateMutation(tracker, fam, mutations, currentTime, prefix); + Put put = new Put(prefix); + if (tracker.getGuidePosts(fam) != null) { + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES, + clientTimeStamp, (tracker.getGuidePosts(fam))); + } + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_KEY_BYTES, + clientTimeStamp, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam))); + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_KEY_BYTES, + clientTimeStamp, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam))); + // Add our empty column value so queries behave correctly + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, + clientTimeStamp, ByteUtil.EMPTY_BYTE_ARRAY); + mutations.add(put); } private static MutationType getMutationType(Mutation m) throws IOException { @@ -128,44 +147,21 @@ public class StatisticsTable implements Closeable { } } - private void formStatsUpdateMutation(StatisticsCollector tracker, String fam, List<Mutation> mutations, - long currentTime, byte[] prefix) { - Put put = new Put(prefix, currentTime); - if (tracker.getGuidePosts(fam) != null) { - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES, - currentTime, (tracker.getGuidePosts(fam))); - } - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MIN_KEY_BYTES, - currentTime, PDataType.VARBINARY.toBytes(tracker.getMinKey(fam))); - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.MAX_KEY_BYTES, - currentTime, PDataType.VARBINARY.toBytes(tracker.getMaxKey(fam))); - // Add our empty column value so queries behave correctly - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, - currentTime, ByteUtil.EMPTY_BYTE_ARRAY); - mutations.add(put); - } - - public static byte[] getRowKeyForTSUpdate(byte[] table) throws IOException { - // always starts with the source table - TrustedByteArrayOutputStream os = new TrustedByteArrayOutputStream(table.length); - os.write(table); - os.close(); - return os.getBuffer(); - } - - public void commitLastStatsUpdatedTime(String tableName, long currentTime) throws IOException { - byte[] prefix = PDataType.VARCHAR.toBytes(tableName); + private void commitLastStatsUpdatedTime() throws IOException { + // Always use wallclock time for this, as it's a mechanism to prevent + // stats from being collected too often. + long currentTime = TimeKeeper.SYSTEM.getCurrentTime(); + byte[] prefix = tableName; Put put = new Put(prefix); - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME_BYTES, currentTime, + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME_BYTES, clientTimeStamp, PDataType.DATE.toBytes(new Date(currentTime))); statisticsTable.put(put); } - public void deleteStats(String tableName, String regionName, StatisticsCollector tracker, String fam, - List<Mutation> mutations, long currentTime) + public void deleteStats(String regionName, StatisticsCollector tracker, String fam, List<Mutation> mutations) throws IOException { - byte[] prefix = StatisticsUtils.getRowKey(PDataType.VARCHAR.toBytes(tableName), PDataType.VARCHAR.toBytes(fam), + byte[] prefix = StatisticsUtils.getRowKey(tableName, PDataType.VARCHAR.toBytes(fam), PDataType.VARCHAR.toBytes(regionName)); - mutations.add(new Delete(prefix, currentTime - 1)); + mutations.add(new Delete(prefix, clientTimeStamp - 1)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java index 8b6d7fc..b45dfbf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stat/StatisticsUtils.java @@ -16,17 +16,39 @@ * limitations under the License. */ package org.apache.phoenix.schema.stat; +import static org.apache.phoenix.util.SchemaUtil.getVarCharLength; + import java.io.IOException; import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.TreeMap; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; -import org.apache.phoenix.util.TrustedByteArrayOutputStream; +import org.apache.phoenix.schema.PDataType; +import org.apache.phoenix.schema.PhoenixArray; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.MetaDataUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; /** * Simple utility class for managing multiple key parts of the statistic */ public class StatisticsUtils { + private static final Logger logger = LoggerFactory.getLogger(StatisticsUtils.class); private StatisticsUtils() { // private ctor for utility classes @@ -35,38 +57,75 @@ public class StatisticsUtils { /** Number of parts in our complex key */ protected static final int NUM_KEY_PARTS = 3; - public static byte[] getRowKey(byte[] table, byte[] fam, byte[] region) throws IOException { + public static byte[] getRowKey(byte[] table, byte[] fam, byte[] region) { // always starts with the source table - TrustedByteArrayOutputStream os = new TrustedByteArrayOutputStream(table.length + region.length - + fam.length + (NUM_KEY_PARTS - 1)); - os.write(table); - os.write(QueryConstants.SEPARATOR_BYTE_ARRAY); - os.write(fam); - os.write(QueryConstants.SEPARATOR_BYTE_ARRAY); - os.write(region); - os.close(); - return os.getBuffer(); + byte[] rowKey = new byte[table.length + fam.length + region.length + 2]; + int offset = 0; + System.arraycopy(table, 0, rowKey, offset, table.length); + offset += table.length; + rowKey[offset++] = QueryConstants.SEPARATOR_BYTE; + System.arraycopy(fam, 0, rowKey, offset, fam.length); + offset += fam.length; + rowKey[offset++] = QueryConstants.SEPARATOR_BYTE; + System.arraycopy(region, 0, rowKey, offset, region.length); + return rowKey; } - public static byte[] getCFFromRowKey(byte[] table, byte[] row, int rowOffset, int rowLength) { - // Move over the the sepeartor byte that would be written after the table name - int startOff = Bytes.indexOf(row, table) + (table.length) + 1; - int endOff = startOff; - while (endOff < rowLength) { - // Check for next seperator byte - if (row[endOff] != QueryConstants.SEPARATOR_BYTE) { - endOff++; + public static byte[] copyRow(KeyValue kv) { + return Arrays.copyOfRange(kv.getRowArray(), kv.getRowOffset(), kv.getRowOffset() + kv.getRowLength()); + } + + public static PTableStats readStatistics(HTableInterface statsHTable, byte[] tableNameBytes, long clientTimeStamp) throws IOException { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + try { + Scan s = MetaDataUtil.newTableRowsScan(tableNameBytes, MetaDataProtocol.MIN_TABLE_TIMESTAMP, clientTimeStamp); + s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES); + ResultScanner scanner = statsHTable.getScanner(s); + Result result = null; + TreeMap<byte[], List<byte[]>> guidePostsPerCf = new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR); + while ((result = scanner.next()) != null) { + CellScanner cellScanner = result.cellScanner(); + while (cellScanner.advance()) { + Cell current = cellScanner.current(); + int tableNameLength = tableNameBytes.length + 1; + int cfOffset = current.getRowOffset() + tableNameLength; + int cfLength = getVarCharLength(current.getRowArray(), cfOffset, current.getRowLength() - tableNameLength); + ptr.set(current.getRowArray(), cfOffset, cfLength); + byte[] cfName = ByteUtil.copyKeyBytesIfNecessary(ptr); + PhoenixArray array = (PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(current.getValueArray(), current.getValueOffset(), current + .getValueLength()); + if (array != null && array.getDimensions() != 0) { + List<byte[]> guidePosts = Lists.newArrayListWithExpectedSize(array.getDimensions()); + for (int j = 0; j < array.getDimensions(); j++) { + byte[] gp = array.toBytes(j); + if (gp.length != 0) { + guidePosts.add(gp); + } + } + List<byte[]> gps = guidePostsPerCf.put(cfName, guidePosts); + if (gps != null) { // Add guidepost already there from other regions + guidePosts.addAll(gps); + } + } + } + } + if (!guidePostsPerCf.isEmpty()) { + // Sort guideposts, as the order above will depend on the order we traverse + // each region's worth of guideposts above. + for (List<byte[]> gps : guidePostsPerCf.values()) { + Collections.sort(gps, Bytes.BYTES_COMPARATOR); + } + return new PTableStatsImpl(guidePostsPerCf); + } + } catch (Exception e) { + if (e instanceof org.apache.hadoop.hbase.TableNotFoundException) { + logger.warn("Stats table not yet online", e); } else { - break; + throw new IOException(e); } + } finally { + statsHTable.close(); } - int cfLength = endOff - startOff; - byte[] cf = new byte[cfLength]; - System.arraycopy(row, startOff, cf, 0, cfLength); - return cf; - } - - public static byte[] copyRow(KeyValue kv) { - return Arrays.copyOfRange(kv.getRowArray(), kv.getRowOffset(), kv.getRowOffset() + kv.getRowLength()); + return PTableStatsImpl.NO_STATS; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java index 545394d..464e87d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/MetaDataUtil.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -406,4 +407,15 @@ public class MetaDataUtil { public static final String IS_LOCAL_INDEX_TABLE_PROP_NAME = "IS_LOCAL_INDEX_TABLE"; public static final byte[] IS_LOCAL_INDEX_TABLE_PROP_BYTES = Bytes.toBytes(IS_LOCAL_INDEX_TABLE_PROP_NAME); + + public static Scan newTableRowsScan(byte[] key, long startTimeStamp, long stopTimeStamp) + throws IOException { + Scan scan = new Scan(); + scan.setTimeRange(startTimeStamp, stopTimeStamp); + scan.setStartRow(key); + byte[] stopKey = ByteUtil.concat(key, QueryConstants.SEPARATOR_BYTE_ARRAY); + ByteUtil.nextKey(stopKey, stopKey.length); + scan.setStopRow(stopKey); + return scan; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 1844edb..96257f6 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -599,7 +599,7 @@ public abstract class BaseTest { conf.setInt("hbase.hlog.asyncer.number", 2); conf.setInt("hbase.assignment.zkevent.workers", 5); conf.setInt("hbase.assignment.threads.max", 5); - conf.setInt(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, 20); + conf.setInt(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, 20); return conf; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java index 5f33537..3151588 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java @@ -51,8 +51,9 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { private static final String DEFAULT_WAL_EDIT_CODEC = IndexedWALEditCodec.class.getName(); public static final long DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE = 1024L*1024L*4L; // 4 Mb public static final long DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE = 1024L*1024L*2L; // 2 Mb - public static final long DEFAULT_HISTOGRAM_BYTE_DEPTH = 2000; - public static final int DEFAULT_STATS_UPDATE_FREQ_MS = 0; + public static final long DEFAULT_STATS_HISTOGRAM_DEPTH_BYTES = 2000; + public static final int DEFAULT_STATS_UPDATE_FREQ_MS = 10000; + public static final int DEFAULT_MIN_STATS_UPDATE_FREQ_MS = 0; public QueryServicesTestImpl(ReadOnlyProps defaultProps) { @@ -62,7 +63,8 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { private static QueryServicesOptions getDefaultServicesOptions() { return withDefaults() .setStatsUpdateFrequencyMs(DEFAULT_STATS_UPDATE_FREQ_MS) - .setHistogramByteDepth(DEFAULT_HISTOGRAM_BYTE_DEPTH) + .setMinStatsUpdateFrequencyMs(DEFAULT_MIN_STATS_UPDATE_FREQ_MS) + .setStatsHistogramDepthBytes(DEFAULT_STATS_HISTOGRAM_DEPTH_BYTES) .setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE) .setQueueSize(DEFAULT_QUEUE_SIZE) .setMaxMemoryPerc(DEFAULT_MAX_MEMORY_PERC) http://git-wip-us.apache.org/repos/asf/phoenix/blob/5668817d/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index 4b25992..4c33660 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -31,6 +31,7 @@ import java.io.File; import java.io.IOException; import java.math.BigDecimal; import java.sql.Connection; +import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; @@ -495,4 +496,16 @@ public class TestUtil { String query = "ANALYZE " + tableName; conn.createStatement().execute(query); } + + public static void analyzeTable(String url, long ts, String tableName) throws IOException, SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + analyzeTable(url, props, tableName); + } + + public static void analyzeTable(String url, Properties props, String tableName) throws IOException, SQLException { + Connection conn = DriverManager.getConnection(url, props); + analyzeTable(conn, tableName); + conn.close(); + } }
