http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java index b269cd1..3703af9 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SaltedIndexIT.java @@ -53,12 +53,13 @@ public class SaltedIndexIT extends BaseIndexIT { @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) public static void doSetup() throws Exception { Map<String,String> props = Maps.newHashMapWithExpectedSize(3); - // Don't split intra region so we can more easily know that the n-way parallelization is for the explain plan - props.put(QueryServices.MAX_INTRA_REGION_PARALLELIZATION_ATTRIB, Integer.toString(1)); // Forces server cache to be used props.put(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(2)); // Drop the HBase table metadata for this test props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); + // Don't put guideposts in + // TODO: ovrides are not being correctly propagated into configs, but when + // they are we can add this: props.put(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, Long.toString(10000000)); // Must update config before starting server setUpTestDriver(getUrl(), new ReadOnlyProps(props.entrySet().iterator())); } @@ -98,6 +99,12 @@ public class SaltedIndexIT extends BaseIndexIT { testMutableTableIndexMaintanence(null, null); } + private void assertQueryPlan(String expectedPrefix, String expectedPostfix, String actual) { + assertTrue("Expected query plan to start with [" + expectedPrefix + "], but got [" + actual + "]", + actual.startsWith(expectedPrefix)); + assertTrue("Expected query plan to end with [" + expectedPostfix + "], but got [" + actual + "]", actual.endsWith(expectedPostfix));; + } + private void testMutableTableIndexMaintanence(Integer tableSaltBuckets, Integer indexSaltBuckets) throws Exception { String query; ResultSet rs; @@ -123,7 +130,9 @@ public class SaltedIndexIT extends BaseIndexIT { stmt.setString(2, "y"); stmt.execute(); conn.commit(); - + stmt = conn.prepareStatement("ANALYZE "+DATA_TABLE_FULL_NAME); + stmt.execute(); + query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME; rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); @@ -177,13 +186,15 @@ public class SaltedIndexIT extends BaseIndexIT { assertFalse(rs.next()); rs = conn.createStatement().executeQuery("EXPLAIN " + query); expectedPlan = tableSaltBuckets == null ? - "CLIENT PARALLEL 1-WAY POINT LOOKUP ON 1 KEY OVER " + DATA_TABLE_FULL_NAME + "\n" + + "CLIENT PARALLEL 3-WAY POINT LOOKUP ON 1 KEY OVER " + DATA_TABLE_FULL_NAME + "\n" + " SERVER SORTED BY [V]\n" + "CLIENT MERGE SORT" : - "CLIENT PARALLEL 1-WAY POINT LOOKUP ON 1 KEY OVER " + DATA_TABLE_FULL_NAME + "\n" + + "CLIENT PARALLEL 4-WAY POINT LOOKUP ON 1 KEY OVER " + DATA_TABLE_FULL_NAME + "\n" + " SERVER SORTED BY [V]\n" + "CLIENT MERGE SORT"; - assertEquals(expectedPlan,QueryUtil.getExplainPlan(rs)); + String explainPlan2 = QueryUtil.getExplainPlan(rs); + String prefix = "CLIENT PARALLEL "; + assertQueryPlan(prefix, expectedPlan.substring(prefix.length()+1),explainPlan2); // Will use data table now, since there's a LIMIT clause and // we're able to optimize out the ORDER BY, unless the data @@ -199,15 +210,16 @@ public class SaltedIndexIT extends BaseIndexIT { assertFalse(rs.next()); rs = conn.createStatement().executeQuery("EXPLAIN " + query); expectedPlan = tableSaltBuckets == null ? - "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME + "\n" + + "CLIENT PARALLEL 3-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME + "\n" + " SERVER FILTER BY V >= 'x'\n" + " SERVER 2 ROW LIMIT\n" + "CLIENT 2 ROW LIMIT" : - "CLIENT PARALLEL 3-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME + "\n" + + "CLIENT PARALLEL 6-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME + "\n" + " SERVER FILTER BY V >= 'x'\n" + " SERVER 2 ROW LIMIT\n" + "CLIENT MERGE SORT\n" + "CLIENT 2 ROW LIMIT"; - assertEquals(expectedPlan,QueryUtil.getExplainPlan(rs)); + String explainPlan = QueryUtil.getExplainPlan(rs); + assertQueryPlan(prefix, expectedPlan.substring(prefix.length()+1),explainPlan); } }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableUpsertSelectIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableUpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableUpsertSelectIT.java index e6fe7d3..690b15c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableUpsertSelectIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableUpsertSelectIT.java @@ -23,10 +23,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.SQLException; import java.util.Properties; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; @@ -94,12 +96,12 @@ public class SaltedTableUpsertSelectIT extends BaseHBaseManagedTimeIT { stmt.setInt(2, 1); stmt.execute(); conn.commit(); - query = "UPSERT INTO target(pk, col) SELECT pk, col from source"; stmt = conn.prepareStatement(query); stmt.execute(); conn.commit(); - + analyzeTable(conn, "source"); + analyzeTable(conn, "target"); query = "SELECT * FROM target"; stmt = conn.prepareStatement(query); ResultSet rs = stmt.executeQuery(); @@ -111,6 +113,11 @@ public class SaltedTableUpsertSelectIT extends BaseHBaseManagedTimeIT { conn.close(); } } + + private void analyzeTable(Connection conn, String tableName) throws IOException, SQLException { + String query = "ANALYZE " + tableName; + conn.createStatement().execute(query); + } @Test public void testUpsertSaltedTableIntoSaltedTable() throws Exception { @@ -188,12 +195,12 @@ public class SaltedTableUpsertSelectIT extends BaseHBaseManagedTimeIT { Connection conn = DriverManager.getConnection(getUrl(), props); conn.setAutoCommit(false); try { - String ddl = "CREATE TABLE IF NOT EXISTS source" + + String ddl = "CREATE TABLE IF NOT EXISTS source1" + " (pk1 varchar NULL, pk2 varchar NULL, pk3 integer NOT NULL, col1 INTEGER" + " CONSTRAINT pk PRIMARY KEY (pk1, pk2, pk3)) SALT_BUCKETS=4"; createTestTable(getUrl(), ddl); - String query = "UPSERT INTO source(pk1, pk2, pk3, col1) VALUES(?,?,?,?)"; + String query = "UPSERT INTO source1(pk1, pk2, pk3, col1) VALUES(?,?,?,?)"; PreparedStatement stmt = conn.prepareStatement(query); stmt.setString(1, "1"); stmt.setString(2, "2"); @@ -203,12 +210,12 @@ public class SaltedTableUpsertSelectIT extends BaseHBaseManagedTimeIT { conn.commit(); conn.setAutoCommit(true); - query = "UPSERT INTO source(pk3, col1, pk1) SELECT pk3+1, col1+1, pk2 from source"; + query = "UPSERT INTO source1(pk3, col1, pk1) SELECT pk3+1, col1+1, pk2 from source1"; stmt = conn.prepareStatement(query); stmt.execute(); conn.commit(); - - query = "SELECT col1 FROM source"; + analyzeTable(conn, "source1"); + query = "SELECT col1 FROM source1"; stmt = conn.prepareStatement(query); ResultSet rs = stmt.executeQuery(); assertTrue(rs.next()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/antlr3/PhoenixSQL.g ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g index a900628..f4c43d3 100644 --- a/phoenix-core/src/main/antlr3/PhoenixSQL.g +++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g @@ -1,6 +1,4 @@ /** - * Copyright 2010 The Apache Software Foundation - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -105,12 +103,12 @@ tokens MAXVALUE='maxvalue'; CYCLE='cycle'; CASCADE='cascade'; + ANALYZE='analyze'; } @parser::header { /** - * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -357,6 +355,7 @@ non_select_node returns [BindableStatement ret] | s=alter_table_node | s=create_sequence_node | s=drop_sequence_node + | s=update_statistics_node | s=explain_node) { contextStack.pop(); $ret = s; } ; @@ -493,6 +492,11 @@ alter_table_node returns [AlterTableStatement ret] { PTableType tt = v==null ? (QueryConstants.SYSTEM_SCHEMA_NAME.equals(t.getSchemaName()) ? PTableType.SYSTEM : PTableType.TABLE) : PTableType.VIEW; ret = ( c == null ? factory.addColumn(factory.namedTable(null,t), tt, d, ex!=null, p) : factory.dropColumn(factory.namedTable(null,t), tt, c, ex!=null) ); } ; +update_statistics_node returns [UpdateStatisticsStatement ret] + : ANALYZE t=from_table_name + {ret = factory.updateStatistics(factory.namedTable(null, t));} + ; + prop_name returns [String ret] : p=identifier {$ret = SchemaUtil.normalizeIdentifier(p); } ; http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java index a36db0b..fcef0ec 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java @@ -72,10 +72,13 @@ public class GlobalCache extends TenantCacheImpl { synchronized(this) { result = metaDataCache; if(result == null) { + long maxTTL = Math.min(config.getLong( + QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, + QueryServicesOptions.DEFAULT_MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS), config.getLong( + QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS)); long maxSize = config.getLong(QueryServices.MAX_SERVER_METADATA_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE); - long maxTTL = config.getLong(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, - QueryServicesOptions.DEFAULT_MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS); metaDataCache = result = CacheBuilder.newBuilder() .maximumWeight(maxSize) .expireAfterAccess(maxTTL, TimeUnit.MILLISECONDS) http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java index 48970c8..5f04fa0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java @@ -40,7 +40,6 @@ import org.apache.phoenix.filter.SingleCFCQKeyValueComparisonFilter; import org.apache.phoenix.filter.SingleCQKeyValueComparisonFilter; import org.apache.phoenix.parse.ColumnParseNode; import org.apache.phoenix.parse.FilterableStatement; -import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.schema.AmbiguousColumnException; http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 9af606e..c05ef30 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -59,11 +59,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.TreeMap; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; @@ -82,7 +84,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -104,8 +105,12 @@ import org.apache.phoenix.schema.PTable.LinkType; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.PhoenixArray; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableNotFoundException; +import org.apache.phoenix.schema.stat.PTableStats; +import org.apache.phoenix.schema.stat.PTableStatsImpl; +import org.apache.phoenix.schema.stat.StatisticsUtils; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; @@ -445,11 +450,85 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null); } } - - return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum, pkName, saltBucketNum, columns, - tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, disableWAL, multiTenant, viewType, viewIndexId); + PName physicalTableName = physicalTables.isEmpty() ? PNameFactory.newName(SchemaUtil.getTableName( + schemaName.getString(), tableName.getString())) : physicalTables.get(0); + PTableStats stats = updateStatsInternal(physicalTableName.getBytes(), columns); + return PTableImpl + .makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum, pkName, + saltBucketNum, columns, tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, + physicalTables, defaultFamilyName, viewStatement, disableWAL, multiTenant, viewType, + viewIndexId, stats); + } + + private PTableStats updateStatsInternal(byte[] tableNameBytes, List<PColumn> columns) throws IOException { + List<PName> family = Lists.newArrayListWithExpectedSize(columns.size()); + for (PColumn column : columns) { + PName familyName = column.getFamilyName(); + if (familyName != null) { + family.add(familyName); + } + } + HTable statsHTable = null; + try { + // Can we do a new HTable instance here? Or get it from a pool or cache of these instances? + statsHTable = new HTable(getEnvironment().getConfiguration(), PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES); + Scan s = new Scan(); + if (tableNameBytes != null) { + // Check for an efficient way here + s.setStartRow(tableNameBytes); + s.setStopRow(ByteUtil.nextKey(tableNameBytes)); + } + ResultScanner scanner = statsHTable.getScanner(s); + Result result = null; + byte[] fam = null; + List<byte[]> guidePosts = Lists.newArrayListWithExpectedSize(columns.size()); + TreeMap<byte[], List<byte[]>> guidePostsPerCf = new TreeMap<byte[], List<byte[]>>(Bytes.BYTES_COMPARATOR); + while ((result = scanner.next()) != null) { + KeyValue[] kvs = result.raw(); + for(KeyValue kv : kvs) { + // For now collect only guide posts + if (Bytes.equals(kv.getQualifier(), PhoenixDatabaseMetaData.GUIDE_POSTS_BYTES)) { + byte[] cfInCell = StatisticsUtils.getCFFromRowKey(tableNameBytes, kv.getRow()); + if (fam == null) { + fam = cfInCell; + } else if (!Bytes.equals(fam, cfInCell)) { + // Sort all the guide posts + guidePostsPerCf.put(cfInCell, guidePosts); + guidePosts = new ArrayList<byte[]>(); + fam = cfInCell; + } + byte[] guidePostVal = new ImmutableBytesPtr(kv.getBuffer(), kv.getValueOffset(), + kv.getValueLength()).copyBytesIfNecessary(); + PhoenixArray array = (PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(guidePostVal); + if (array != null && array.getDimensions() != 0) { + for (int j = 0; j < array.getDimensions(); j++) { + byte[] gp = array.toBytes(j); + if (gp.length != 0) { + guidePosts.add(gp); + } + } + } + } + } + } + if (fam != null) { + // Sort all the guideposts + guidePostsPerCf.put(fam, guidePosts); + } + return new PTableStatsImpl(guidePostsPerCf); + } catch (Exception e) { + if (e instanceof org.apache.hadoop.hbase.TableNotFoundException) { + logger.warn("Stats table not yet online", e); + } else { + throw new IOException(e); + } + } finally { + if (statsHTable != null) { + statsHTable.close(); + } + } + return PTableStatsImpl.NO_STATS; } - private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, long clientTimeStamp) throws IOException { if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) { return null; @@ -1337,11 +1416,41 @@ public class MetaDataEndpointImpl extends BaseEndpointCoprocessor implements Met return results.size() > 0 && !allViewsNotInSingleRegion; } - /** + /**o * Returns true is the table has views and they are all NOT in the same HBase region. */ private boolean allViewsInMultipleRegions() { return results.size() > 0 && allViewsNotInSingleRegion; } } + + @Override + public void clearCacheForTable(byte[] tenantId, byte[] schema, byte[] tableName, final long clientTS) + throws IOException { + byte[] tableKey = SchemaUtil.getTableKey(tenantId, schema, tableName); + ImmutableBytesPtr key = new ImmutableBytesPtr(tableKey); + try { + PTable table = doGetTable(tableKey, clientTS); + if (table != null) { + Cache<ImmutableBytesPtr, PTable> metaDataCache = GlobalCache.getInstance(getEnvironment()).getMetaDataCache(); + // Add +1 to the ts + // TODO : refactor doGetTable() to do this - optionally increment the timestamp + // TODO : clear metadata if it is spread across multiple region servers + long ts = table.getTimeStamp() + 1; + // Here we could add an empty puti + HRegion region = getEnvironment().getRegion(); + List<Mutation> mutations = new ArrayList<Mutation>(); + Put p = new Put(tableKey); + p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ts, ByteUtil.EMPTY_BYTE_ARRAY); + mutations.add(p); + region.mutateRowsWithLocks(mutations, Collections.<byte[]> emptySet()); + metaDataCache.invalidate(key); + } + } catch (Throwable t) { + // We could still invalidate it + logger.error("clearCacheForTable failed to update the latest ts ", t); + throw new IOException(t); + } + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java index 351662b..66fb1ac 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java @@ -267,6 +267,9 @@ public interface MetaDataProtocol extends CoprocessorProtocol { */ void clearCache(); + void clearCacheForTable(final byte[] tenantID, final byte[] schema, final byte[] tableName, final long clientTS) + throws IOException; + /** * Get the version of the server-side HBase and phoenix.jar. Used when initially connecting * to a cluster to ensure that the client and server jars are compatible. http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 768785b..9864218 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -43,7 +43,6 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumn; -import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; @@ -271,7 +270,6 @@ public class MutationState implements SQLCloseable { private long[] validate() throws SQLException { int i = 0; Long scn = connection.getSCN(); - PName tenantId = connection.getTenantId(); MetaDataClient client = new MetaDataClient(connection); long[] timeStamps = new long[this.mutations.size()]; for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : mutations.entrySet()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDecimalExpression.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDecimalExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDecimalExpression.java index 92756a5..7e76234 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDecimalExpression.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RoundDecimalExpression.java @@ -17,30 +17,30 @@ */ package org.apache.phoenix.expression.function; +import static org.apache.phoenix.schema.PDataType.DECIMAL; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.math.BigDecimal; import java.math.RoundingMode; import java.sql.SQLException; +import java.util.Collections; import java.util.List; +import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.compile.KeyPart; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.IllegalDataException; +import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.schema.tuple.Tuple; import com.google.common.collect.Lists; -import java.util.Collections; -import org.apache.hadoop.hbase.filter.CompareFilter; -import org.apache.phoenix.compile.KeyPart; -import static org.apache.phoenix.expression.function.ScalarFunction.evaluateExpression; -import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.schema.PColumn; -import static org.apache.phoenix.schema.PDataType.DECIMAL; /** * http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java index 7fb99ff..a54b5b4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java @@ -18,31 +18,28 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; -import java.util.Collection; import java.util.Collections; import java.util.List; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; - -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Iterables; -import com.google.common.collect.ListMultimap; -import com.google.common.collect.Lists; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.parse.HintNode; -import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.query.StatsManager; +import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SchemaUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; /** @@ -54,12 +51,11 @@ import org.apache.phoenix.util.ReadOnlyProps; */ public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRegionSplitter { - protected final int targetConcurrency; - protected final int maxConcurrency; - protected final int maxIntraRegionParallelization; + protected final long guidePostsDepth; protected final StatementContext context; protected final TableRef tableRef; + private static final Logger logger = LoggerFactory.getLogger(DefaultParallelIteratorRegionSplitter.class); public static DefaultParallelIteratorRegionSplitter getInstance(StatementContext context, TableRef table, HintNode hintNode) { return new DefaultParallelIteratorRegionSplitter(context, table, hintNode); } @@ -68,22 +64,16 @@ public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRe this.context = context; this.tableRef = table; ReadOnlyProps props = context.getConnection().getQueryServices().getProps(); - this.targetConcurrency = props.getInt(QueryServices.TARGET_QUERY_CONCURRENCY_ATTRIB, - QueryServicesOptions.DEFAULT_TARGET_QUERY_CONCURRENCY); - this.maxConcurrency = props.getInt(QueryServices.MAX_QUERY_CONCURRENCY_ATTRIB, - QueryServicesOptions.DEFAULT_MAX_QUERY_CONCURRENCY); - Preconditions.checkArgument(targetConcurrency >= 1, "Invalid target concurrency: " + targetConcurrency); - Preconditions.checkArgument(maxConcurrency >= targetConcurrency , "Invalid max concurrency: " + maxConcurrency); - this.maxIntraRegionParallelization = hintNode.hasHint(Hint.NO_INTRA_REGION_PARALLELIZATION) ? 1 : props.getInt(QueryServices.MAX_INTRA_REGION_PARALLELIZATION_ATTRIB, - QueryServicesOptions.DEFAULT_MAX_INTRA_REGION_PARALLELIZATION); - Preconditions.checkArgument(maxIntraRegionParallelization >= 1 , "Invalid max intra region parallelization: " + maxIntraRegionParallelization); + this.guidePostsDepth = props.getLong(QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB, + QueryServicesOptions.DEFAULT_HISTOGRAM_BYTE_DEPTH); } // Get the mapping between key range and the regions that contains them. protected List<HRegionLocation> getAllRegions() throws SQLException { Scan scan = context.getScan(); PTable table = tableRef.getTable(); - List<HRegionLocation> allTableRegions = context.getConnection().getQueryServices().getAllTableRegions(table.getPhysicalName().getBytes()); + List<HRegionLocation> allTableRegions = context.getConnection().getQueryServices() + .getAllTableRegions(table.getPhysicalName().getBytes()); // If we're not salting, then we've already intersected the minMaxRange with the scan range // so there's nothing to do here. return filterRegions(allTableRegions, scan.getStartRow(), scan.getStopRow()); @@ -107,7 +97,8 @@ public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRe regions = Iterables.filter(allTableRegions, new Predicate<HRegionLocation>() { @Override public boolean apply(HRegionLocation location) { - KeyRange regionKeyRange = KeyRange.getKeyRange(location.getRegionInfo().getStartKey(), location.getRegionInfo().getEndKey()); + KeyRange regionKeyRange = KeyRange.getKeyRange(location.getRegionInfo().getStartKey(), location + .getRegionInfo().getEndKey()); return keyRange.intersect(regionKeyRange) != KeyRange.EMPTY_RANGE; } }); @@ -115,109 +106,66 @@ public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRe } protected List<KeyRange> genKeyRanges(List<HRegionLocation> regions) { - if (regions.isEmpty()) { - return Collections.emptyList(); - } - - StatsManager statsManager = context.getConnection().getQueryServices().getStatsManager(); - // the splits are computed as follows: - // - // let's suppose: - // t = target concurrency - // m = max concurrency - // r = the number of regions we need to scan - // - // if r >= t: - // scan using regional boundaries - // elif r > t/2: - // split each region in s splits such that: - // s = max(x) where s * x < m - // else: - // split each region in s splits such that: - // s = max(x) where s * x < t - // - // The idea is to align splits with region boundaries. If rows are not evenly - // distributed across regions, using this scheme compensates for regions that - // have more rows than others, by applying tighter splits and therefore spawning - // off more scans over the overloaded regions. - int splitsPerRegion = regions.size() >= targetConcurrency ? 1 : (regions.size() > targetConcurrency / 2 ? maxConcurrency : targetConcurrency) / regions.size(); - splitsPerRegion = Math.min(splitsPerRegion, maxIntraRegionParallelization); - // Create a multi-map of ServerName to List<KeyRange> which we'll use to round robin from to ensure - // that we keep each region server busy for each query. - ListMultimap<HRegionLocation,KeyRange> keyRangesPerRegion = ArrayListMultimap.create(regions.size(),regions.size() * splitsPerRegion);; - if (splitsPerRegion == 1) { - for (HRegionLocation region : regions) { - keyRangesPerRegion.put(region, ParallelIterators.TO_KEY_RANGE.apply(region)); - } + if (regions.isEmpty()) { return Collections.emptyList(); } + Scan scan = context.getScan(); + PTable table = this.tableRef.getTable(); + byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table); + List<byte[]> gps = Lists.newArrayList(); + + if (table.getColumnFamilies().isEmpty()) { + // For sure we can get the defaultCF from the table + gps = table.getGuidePosts(); } else { - // Maintain bucket for each server and then returns KeyRanges in round-robin - // order to ensure all servers are utilized. - for (HRegionLocation region : regions) { - byte[] startKey = region.getRegionInfo().getStartKey(); - byte[] stopKey = region.getRegionInfo().getEndKey(); - boolean lowerUnbound = Bytes.compareTo(startKey, HConstants.EMPTY_START_ROW) == 0; - boolean upperUnbound = Bytes.compareTo(stopKey, HConstants.EMPTY_END_ROW) == 0; - /* - * If lower/upper unbound, get the min/max key from the stats manager. - * We use this as the boundary to split on, but we still use the empty - * byte as the boundary in the actual scan (in case our stats are out - * of date). - */ - if (lowerUnbound) { - startKey = statsManager.getMinKey(tableRef); - if (startKey == null) { - keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region)); - continue; - } - } - if (upperUnbound) { - stopKey = statsManager.getMaxKey(tableRef); - if (stopKey == null) { - keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region)); - continue; + try { + if (scan.getFamilyMap().size() > 0) { + if (scan.getFamilyMap().containsKey(defaultCF)) { // Favor using default CF if it's used in scan + gps = table.getColumnFamily(defaultCF).getGuidePosts(); + } else { // Otherwise, just use first CF in use by scan + gps = table.getColumnFamily(scan.getFamilyMap().keySet().iterator().next()).getGuidePosts(); } - } - - byte[][] boundaries = null; - // Both startKey and stopKey will be empty the first time - if (Bytes.compareTo(startKey, stopKey) >= 0 || (boundaries = Bytes.split(startKey, stopKey, splitsPerRegion - 1)) == null) { - // Bytes.split may return null if the key space - // between start and end key is too small - keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region)); } else { - keyRangesPerRegion.put(region,KeyRange.getKeyRange(lowerUnbound ? KeyRange.UNBOUND : boundaries[0], boundaries[1])); - if (boundaries.length > 1) { - for (int i = 1; i < boundaries.length-2; i++) { - keyRangesPerRegion.put(region,KeyRange.getKeyRange(boundaries[i], true, boundaries[i+1], false)); - } - keyRangesPerRegion.put(region,KeyRange.getKeyRange(boundaries[boundaries.length-2], true, upperUnbound ? KeyRange.UNBOUND : boundaries[boundaries.length-1], false)); - } + gps = table.getColumnFamily(defaultCF).getGuidePosts(); } + } catch (ColumnFamilyNotFoundException cfne) { + // Alter table does this } + } - List<KeyRange> splits = Lists.newArrayListWithCapacity(regions.size() * splitsPerRegion); - // as documented for ListMultimap - Collection<Collection<KeyRange>> values = keyRangesPerRegion.asMap().values(); - List<Collection<KeyRange>> keyRangesList = Lists.newArrayList(values); - // Randomize range order to ensure that we don't hit the region servers in the same order every time - // thus helping to distribute the load more evenly - Collections.shuffle(keyRangesList); - // Transpose values in map to get regions in round-robin server order. This ensures that - // all servers will be used to process the set of parallel threads available in our executor. - int i = 0; - boolean done; - do { - done = true; - for (int j = 0; j < keyRangesList.size(); j++) { - List<KeyRange> keyRanges = (List<KeyRange>)keyRangesList.get(j); - if (i < keyRanges.size()) { - splits.add(keyRanges.get(i)); - done = false; + + List<KeyRange> guidePosts = Lists.newArrayListWithCapacity(regions.size()); + byte[] currentKey = regions.get(0).getRegionInfo().getStartKey(); + byte[] endKey = null; + int regionIndex = 0; + int guideIndex = 0; + int gpsSize = gps.size(); + int regionSize = regions.size(); + if (currentKey.length > 0) { + guideIndex = Collections.binarySearch(gps, currentKey, Bytes.BYTES_COMPARATOR); + guideIndex = (guideIndex < 0 ? -(guideIndex + 1) : (guideIndex + 1)); + } + // Merge bisect with guideposts for all but the last region + while (regionIndex < regionSize) { + byte[] currentGuidePost; + currentKey = regions.get(regionIndex).getRegionInfo().getStartKey(); + endKey = regions.get(regionIndex++).getRegionInfo().getEndKey(); + while (guideIndex < gpsSize + && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) { + KeyRange keyRange = KeyRange.getKeyRange(currentKey, currentGuidePost); + if (keyRange != KeyRange.EMPTY_RANGE) { + guidePosts.add(keyRange); } + currentKey = currentGuidePost; + guideIndex++; + } + KeyRange keyRange = KeyRange.getKeyRange(currentKey, endKey); + if (keyRange != KeyRange.EMPTY_RANGE) { + guidePosts.add(keyRange); } - i++; - } while (!done); - return splits; + } + if (logger.isDebugEnabled()) { + logger.debug("The captured guideposts are: " + guidePosts); + } + return guidePosts; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index 987169a..cb7d492 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -105,6 +105,10 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho public static final byte[] SYSTEM_CATALOG_TABLE_BYTES = Bytes.toBytes(SYSTEM_CATALOG_SCHEMA); public static final String SYSTEM_CATALOG_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_CATALOG_TABLE); public static final byte[] SYSTEM_CATALOG_NAME_BYTES = SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_TABLE_BYTES, SYSTEM_CATALOG_SCHEMA_BYTES); + public static final String SYSTEM_STATS_TABLE = "STATS"; + public static final byte[] SYSTEM_STATS_BYTES = Bytes.toBytes(SYSTEM_STATS_TABLE); + public static final String SYSTEM_STATS_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_STATS_TABLE); + public static final byte[] SYSTEM_STATS_NAME_BYTES = SchemaUtil.getTableNameAsBytes(SYSTEM_CATALOG_TABLE_BYTES, SYSTEM_STATS_BYTES); public static final String SYSTEM_CATALOG_ALIAS = "\"SYSTEM.TABLE\""; @@ -112,6 +116,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho public static final byte[] TABLE_NAME_BYTES = Bytes.toBytes(TABLE_NAME); public static final String TABLE_TYPE = "TABLE_TYPE"; public static final byte[] TABLE_TYPE_BYTES = Bytes.toBytes(TABLE_TYPE); + public static final String PHYSICAL_NAME = "PHYSICAL_NAME"; + public static final byte[] PHYSICAL_NAME_BYTES = Bytes.toBytes(PHYSICAL_NAME); public static final String COLUMN_FAMILY = "COLUMN_FAMILY"; public static final String TABLE_CAT = "TABLE_CAT"; @@ -222,6 +228,16 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho public static final String INDEX_DISABLE_TIMESTAMP = "INDEX_DISABLE_TIMESTAMP"; public static final byte[] INDEX_DISABLE_TIMESTAMP_BYTES = Bytes.toBytes(INDEX_DISABLE_TIMESTAMP); + public static final String REGION_NAME = "REGION_NAME"; + public static final byte[] REGION_NAME_BYTES = Bytes.toBytes(REGION_NAME); + public static final String GUIDE_POSTS = "GUIDE_POSTS"; + public static final byte[] GUIDE_POSTS_BYTES = Bytes.toBytes(GUIDE_POSTS); + public static final String MIN_KEY = "MIN_KEY"; + public static final byte[] MIN_KEY_BYTES = Bytes.toBytes(MIN_KEY); + public static final String MAX_KEY = "MAX_KEY"; + public static final byte[] MAX_KEY_BYTES = Bytes.toBytes(MAX_KEY); + public static final String LAST_STATS_UPDATE_TIME = "LAST_STATS_UPDATE_TIME"; + public static final byte[] LAST_STATS_UPDATE_TIME_BYTES = Bytes.toBytes(LAST_STATS_UPDATE_TIME); private final PhoenixConnection connection; private final ResultSet emptyResultSet; http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 45e6973..2840dca 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -91,6 +91,7 @@ import org.apache.phoenix.parse.SQLParser; import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.parse.TableName; import org.apache.phoenix.parse.TableNode; +import org.apache.phoenix.parse.UpdateStatisticsStatement; import org.apache.phoenix.parse.UpsertStatement; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; @@ -644,6 +645,49 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho }; } } + + private static class ExecutableUpdateStatisticsStatement extends UpdateStatisticsStatement implements + CompilableStatement { + + public ExecutableUpdateStatisticsStatement(NamedTableNode table) { + super(table); + } + + @SuppressWarnings("unchecked") + @Override + public MutationPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { + final StatementContext context = new StatementContext(stmt); + return new MutationPlan() { + + @Override + public StatementContext getContext() { + return context; + } + + @Override + public ParameterMetaData getParameterMetaData() { + return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA; + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return new ExplainPlan(Collections.singletonList("ANALYZE")); + } + + @Override + public PhoenixConnection getConnection() { + return stmt.getConnection(); + } + + @Override + public MutationState execute() throws SQLException { + MetaDataClient client = new MetaDataClient(getConnection()); + return client.updateStatistics(ExecutableUpdateStatisticsStatement.this); + } + }; + } + + } private static class ExecutableAddColumnStatement extends AddColumnStatement implements CompilableStatement { @@ -799,6 +843,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho public ExplainStatement explain(BindableStatement statement) { return new ExecutableExplainStatement(statement); } + + @Override + public UpdateStatisticsStatement updateStatistics(NamedTableNode table) { + return new ExecutableUpdateStatisticsStatement(table); + } } static class PhoenixStatementParser extends SQLParser { http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java index f1e39eb..7032d83 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java @@ -338,6 +338,10 @@ public class ParseNodeFactory { public DivideParseNode divide(List<ParseNode> children) { return new DivideParseNode(children); } + + public UpdateStatisticsStatement updateStatistics(NamedTableNode table) { + return new UpdateStatisticsStatement(table); + } public FunctionParseNode functionDistinct(String name, List<ParseNode> args) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/parse/UpdateStatisticsStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/UpdateStatisticsStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpdateStatisticsStatement.java new file mode 100644 index 0000000..9eff74a --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/UpdateStatisticsStatement.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.parse; + +public class UpdateStatisticsStatement extends SingleTableStatement { + + public UpdateStatisticsStatement(NamedTableNode table) { + super(table, 0); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index 5f43a63..a05acde 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -63,8 +63,6 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated public HTableDescriptor getTableDescriptor(byte[] tableName) throws SQLException; - public StatsManager getStatsManager(); - public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException; public PhoenixConnection connect(String url, Properties info) throws SQLException; @@ -96,6 +94,9 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated void addConnection(PhoenixConnection connection) throws SQLException; void removeConnection(PhoenixConnection connection) throws SQLException; + long updateStatistics(KeyRange keyRange, byte[] tableName) + throws SQLException; + /** * @return the {@link KeyValueBuilder} that is valid for the locally installed version of HBase. */ @@ -105,4 +106,5 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated public boolean supportsFeature(Feature feature); public String getUserName(); + public void clearCacheForTable(final byte[] tenantId, final byte[] schemaName, final byte[] tableName, long clientTS) throws SQLException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 086dc0f..0676e86 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -102,6 +102,9 @@ import org.apache.phoenix.schema.Sequence; import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; +import org.apache.phoenix.schema.stat.StatisticsCollector; +import org.apache.phoenix.schema.stat.StatisticsCollectorProtocol; +import org.apache.phoenix.schema.stat.StatisticsCollectorResponse; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.MetaDataUtil; @@ -118,6 +121,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.protobuf.ServiceException; public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices { private static final Logger logger = LoggerFactory.getLogger(ConnectionQueryServicesImpl.class); @@ -129,7 +133,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private final ReadOnlyProps props; private final String userName; private final ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices; - private final StatsManager statsManager; // Cache the latest meta data here for future connections // writes guarded by "latestMetaDataLock" @@ -185,9 +188,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // TODO: should we track connection wide memory usage or just org-wide usage? // If connection-wide, create a MemoryManager here, otherwise just use the one from the delegate this.childServices = new ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices>(INITIAL_CHILD_SERVICES_CAPACITY); - int statsUpdateFrequencyMs = this.getProps().getInt(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS); - int maxStatsAgeMs = this.getProps().getInt(QueryServices.MAX_STATS_AGE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_STATS_AGE_MS); - this.statsManager = new StatsManagerImpl(this, statsUpdateFrequencyMs, maxStatsAgeMs); // find the HBase version and use that to determine the KeyValueBuilder that should be used String hbaseVersion = VersionInfo.getVersion(); @@ -216,11 +216,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } @Override - public StatsManager getStatsManager() { - return this.statsManager; - } - - @Override public HTableInterface getTable(byte[] tableName) throws SQLException { try { return HBaseFactoryProvider.getHTableFactory().getTable(tableName, connection, getExecutor()); @@ -275,42 +270,29 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement sqlE = e; } finally { try { - // Clear any client-side caches. - statsManager.clearStats(); - } catch (SQLException e) { + childServices.clear(); + synchronized (latestMetaDataLock) { + latestMetaData = null; + latestMetaDataLock.notifyAll(); + } + if (connection != null) connection.close(); + } catch (IOException e) { if (sqlE == null) { - sqlE = e; + sqlE = ServerUtil.parseServerException(e); } else { - sqlE.setNextException(e); + sqlE.setNextException(ServerUtil.parseServerException(e)); } } finally { try { - childServices.clear(); - synchronized (latestMetaDataLock) { - latestMetaData = null; - latestMetaDataLock.notifyAll(); - } - if (connection != null) connection.close(); - } catch (IOException e) { + super.close(); + } catch (SQLException e) { if (sqlE == null) { - sqlE = ServerUtil.parseServerException(e); + sqlE = e; } else { - sqlE.setNextException(ServerUtil.parseServerException(e)); + sqlE.setNextException(e); } } finally { - try { - super.close(); - } catch (SQLException e) { - if (sqlE == null) { - sqlE = e; - } else { - sqlE.setNextException(e); - } - } finally { - if (sqlE != null) { - throw sqlE; - } - } + if (sqlE != null) { throw sqlE; } } } } @@ -540,6 +522,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) { descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, 1, null); } + if (!descriptor.hasCoprocessor(StatisticsCollector.class.getName())) { + descriptor.addCoprocessor(StatisticsCollector.class.getName(), null, 1, null); + } // TODO: better encapsulation for this // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. Also, @@ -1310,6 +1295,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement ConnectionQueryServicesImpl.this, url, scnProps, newEmptyMetaData()); try { metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA); + // TODO : Get this from a configuration + metaConnection.createStatement().executeUpdate( + QueryConstants.CREATE_STATS_TABLE_METADATA); } catch (NewerTableAlreadyExistsException ignore) { // Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed timestamp. // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp. @@ -1811,4 +1799,78 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement throw new IllegalStateException("Connection to the cluster is closed"); } + @Override + public long updateStatistics(final KeyRange keyRange, final byte[] tableName) throws SQLException { + HTableInterface ht = null; + try { + ht = this.getTable(tableName); + long noOfRowsScanned = 0; + Batch.Call<StatisticsCollectorProtocol, StatisticsCollectorResponse> callable = + new Batch.Call<StatisticsCollectorProtocol, StatisticsCollectorResponse>() { + @Override + public StatisticsCollectorResponse call(StatisticsCollectorProtocol instance) throws IOException { + return instance.collectStat(keyRange); + } + }; + Map<byte[], StatisticsCollectorResponse> result = ht.coprocessorExec(StatisticsCollectorProtocol.class, + keyRange.getLowerRange(), keyRange.getUpperRange(), callable); + for (StatisticsCollectorResponse response : result.values()) { + noOfRowsScanned += response.getRowsScanned(); + } + return noOfRowsScanned; + } catch (ServiceException e) { + throw new SQLException("Unable to update the statistics for the table " + tableName, e); + } catch (TableNotFoundException e) { + throw new SQLException("Unable to update the statistics for the table " + tableName, e); + } catch (Throwable e) { + throw new SQLException("Unable to update the statistics for the table " + tableName, e); + } finally { + if (ht != null) { + try { + ht.close(); + } catch (IOException e) { + throw new SQLException("Unable to close the table " + tableName + " after collecting stats", e); + } + } + } + } + + @Override + public void clearCacheForTable(final byte[] tenantId, final byte[] schemaName, final byte[] tableName, + final long clientTS) throws SQLException { + // clear the meta data cache for the table here + try { + SQLException sqlE = null; + HTableInterface htable = this.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + byte[] tableKey = SchemaUtil.getTableKey(tenantId, schemaName, tableName); + try { + metaDataCoprocessorExec(tableKey, + new Batch.Call<MetaDataProtocol, MetaDataMutationResult>() { + @Override + public MetaDataMutationResult call(MetaDataProtocol instance) throws IOException { + instance.clearCacheForTable(tenantId, schemaName, tableName, clientTS); + // TODO : Should this really return a result?Return null + return new MetaDataMutationResult(); + } + }); + + } catch (Throwable e) { + sqlE = new SQLException(e); + } finally { + try { + htable.close(); + } catch (IOException e) { + if (sqlE == null) { + sqlE = ServerUtil.parseServerException(e); + } else { + sqlE.setNextException(ServerUtil.parseServerException(e)); + } + } finally { + if (sqlE != null) { throw sqlE; } + } + } + } catch (Exception e) { + throw new SQLException(ServerUtil.parseServerException(e)); + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 09f42aa..4e155e3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -65,7 +65,6 @@ import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.SequenceNotFoundException; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; -import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; @@ -116,30 +115,6 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple } @Override - public StatsManager getStatsManager() { - return new StatsManager() { - - @Override - public byte[] getMinKey(TableRef table) { - return HConstants.EMPTY_START_ROW; - } - - @Override - public byte[] getMaxKey(TableRef table) { - return HConstants.EMPTY_END_ROW; - } - - @Override - public void updateStats(TableRef table) throws SQLException { - } - - @Override - public void clearStats() throws SQLException { - } - }; - } - - @Override public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException { return Collections.singletonList(new HRegionLocation(new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW),"localhost",-1)); } @@ -207,6 +182,15 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, 0, null); } + @Override + public long updateStatistics(KeyRange keyRange, byte[] tableName) throws SQLException { + // Noop + return 0; + } + + @Override + public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS) + throws SQLException {} // TODO: share this with ConnectionQueryServicesImpl @Override public void init(String url, Properties props) throws SQLException { @@ -242,6 +226,15 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed timestamp. // A TableAlreadyExistsException is not thrown, since the table only exists *after* this fixed timestamp. } + try { + // TODO : Get this from a configuration + metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_STATS_TABLE_METADATA); + } catch (NewerTableAlreadyExistsException ignore) { + // Ignore, as this will happen if the SYSTEM.SEQUENCE already exists at this fixed + // timestamp. + // A TableAlreadyExistsException is not thrown, since the table only exists *after* this + // fixed timestamp. + } } catch (SQLException e) { sqlE = e; } finally { http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index 0b6a399..fa01f09 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -65,11 +65,6 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple } @Override - public StatsManager getStatsManager() { - return getDelegate().getStatsManager(); - } - - @Override public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException { return getDelegate().getAllTableRegions(tableName); } @@ -231,4 +226,15 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple public String getUserName() { return getDelegate().getUserName(); } + + @Override + public long updateStatistics(KeyRange keyRange, byte[] tableName) throws SQLException { + return getDelegate().updateStatistics(keyRange, tableName); + } + + @Override + public void clearCacheForTable(byte[] tenantId, byte[] schemaName, byte[] tableName, long clientTS) + throws SQLException { + getDelegate().clearCacheForTable(tenantId, schemaName, tableName, clientTS); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index 1e128c2..f5ba490 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -34,23 +34,31 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INCREMENT_BY; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_AUTOINCREMENT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NULLABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_STATS_UPDATE_TIME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LIMIT_REACHED_FLAG; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LINK_TYPE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_KEY; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MAX_VALUE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_KEY; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_VALUE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NULLABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_PREC_RADIX; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REF_GENERATION; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REGION_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REMARKS; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCOPE_CATALOG; @@ -66,6 +74,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATETIME_SUB; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM; @@ -77,8 +86,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP; + import java.math.BigDecimal; import org.apache.hadoop.hbase.HConstants; @@ -110,6 +118,7 @@ public interface QueryConstants { public enum JoinType {INNER, LEFT_OUTER} public final static String SYSTEM_SCHEMA_NAME = "SYSTEM"; + public final static byte[] SYSTEM_SCHEMA_NAME_BYTES = Bytes.toBytes(SYSTEM_SCHEMA_NAME); public final static String PHOENIX_METADATA = "table"; public final static PName SINGLE_COLUMN_NAME = PNameFactory.newNormalizedName("s"); @@ -219,6 +228,22 @@ public interface QueryConstants { HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" + HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n"; + public static final String CREATE_STATS_TABLE_METADATA = + "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_STATS_TABLE + "\"(\n" + + // PK columns + PHYSICAL_NAME + " VARCHAR NOT NULL," + + COLUMN_FAMILY + " VARCHAR," + + REGION_NAME + " VARCHAR," + + GUIDE_POSTS + " VARBINARY[]," + + MIN_KEY + " VARBINARY," + + MAX_KEY + " VARBINARY," + + LAST_STATS_UPDATE_TIME+ " DATE, "+ + "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + + PHYSICAL_NAME + "," + + COLUMN_FAMILY + ","+ REGION_NAME+"))\n" + + HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" + + HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n"; + public static final String CREATE_SEQUENCE_METADATA = "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + TYPE_SEQUENCE + "\"(\n" + TENANT_ID + " VARCHAR NULL," + http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index da35626..14fea16 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -68,18 +68,14 @@ public interface QueryServices extends SQLCloseable { public static final String MAX_MEMORY_WAIT_MS_ATTRIB = "phoenix.query.maxGlobalMemoryWaitMs"; public static final String MAX_TENANT_MEMORY_PERC_ATTRIB = "phoenix.query.maxTenantMemoryPercentage"; public static final String MAX_SERVER_CACHE_SIZE_ATTRIB = "phoenix.query.maxServerCacheBytes"; - public static final String TARGET_QUERY_CONCURRENCY_ATTRIB = "phoenix.query.targetConcurrency"; - public static final String MAX_QUERY_CONCURRENCY_ATTRIB = "phoenix.query.maxConcurrency"; public static final String DATE_FORMAT_ATTRIB = "phoenix.query.dateFormat"; public static final String NUMBER_FORMAT_ATTRIB = "phoenix.query.numberFormat"; public static final String STATS_UPDATE_FREQ_MS_ATTRIB = "phoenix.query.statsUpdateFrequency"; - public static final String MAX_STATS_AGE_MS_ATTRIB = "phoenix.query.maxStatsAge"; public static final String CALL_QUEUE_ROUND_ROBIN_ATTRIB = "ipc.server.callqueue.roundrobin"; public static final String SCAN_CACHE_SIZE_ATTRIB = "hbase.client.scanner.caching"; public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize"; public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize"; public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCacheTimeToLiveMs"; - public static final String MAX_INTRA_REGION_PARALLELIZATION_ATTRIB = "phoenix.query.maxIntraRegionParallelization"; public static final String ROW_KEY_ORDER_SALTED_TABLE_ATTRIB = "phoenix.query.rowKeyOrderSaltedTable"; public static final String USE_INDEXES_ATTRIB = "phoenix.query.useIndexes"; public static final String IMMUTABLE_ROWS_ATTRIB = "phoenix.mutate.immutableRows"; @@ -123,6 +119,7 @@ public interface QueryServices extends SQLCloseable { // Index will be partially re-built from index disable time stamp - following overlap time public static final String INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB = "phoenix.index.failure.handling.rebuild.overlap.time"; + public static final String HISTOGRAM_BYTE_DEPTH_ATTRIB = "phoenix.guidepost.width"; /** * Get executor service used for parallel scans http://git-wip-us.apache.org/repos/asf/phoenix/blob/15a54d55/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 78681a0..9a3284e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -24,16 +24,15 @@ import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB; import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB; import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB; +import static org.apache.phoenix.query.QueryServices.HISTOGRAM_BYTE_DEPTH_ATTRIB; import static org.apache.phoenix.query.QueryServices.IMMUTABLE_ROWS_ATTRIB; import static org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB; import static org.apache.phoenix.query.QueryServices.KEEP_ALIVE_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.MASTER_INFO_PORT_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB; -import static org.apache.phoenix.query.QueryServices.MAX_INTRA_REGION_PARALLELIZATION_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_PERC_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_WAIT_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_MUTATION_SIZE_ATTRIB; -import static org.apache.phoenix.query.QueryServices.MAX_QUERY_CONCURRENCY_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_SERVER_METADATA_CACHE_SIZE_ATTRIB; @@ -51,7 +50,6 @@ import static org.apache.phoenix.query.QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.SPOOL_DIRECTORY; import static org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB; -import static org.apache.phoenix.query.QueryServices.TARGET_QUERY_CONCURRENCY_ATTRIB; import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB; @@ -74,7 +72,7 @@ import org.apache.phoenix.util.ReadOnlyProps; public class QueryServicesOptions { public static final int DEFAULT_KEEP_ALIVE_MS = 60000; public static final int DEFAULT_THREAD_POOL_SIZE = 128; - public static final int DEFAULT_QUEUE_SIZE = 500; + public static final int DEFAULT_QUEUE_SIZE = 5000; public static final int DEFAULT_THREAD_TIMEOUT_MS = 600000; // 10min public static final int DEFAULT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 20; // 20m public static final String DEFAULT_SPOOL_DIRECTORY = "/tmp"; @@ -107,6 +105,7 @@ public class QueryServicesOptions { // latency and client-side spooling/buffering. Smaller means less initial // latency and less parallelization. public static final long DEFAULT_SCAN_RESULT_CHUNK_SIZE = 2999; + public static final long DEFAULT_HISTOGRAM_BYTE_DEPTH = 1024 * 1024; // // Spillable GroupBy - SPGBY prefix @@ -167,13 +166,10 @@ public class QueryServicesOptions { .setIfUnset(MAX_TENANT_MEMORY_PERC_ATTRIB, DEFAULT_MAX_TENANT_MEMORY_PERC) .setIfUnset(MAX_SERVER_CACHE_SIZE_ATTRIB, DEFAULT_MAX_SERVER_CACHE_SIZE) .setIfUnset(SCAN_CACHE_SIZE_ATTRIB, DEFAULT_SCAN_CACHE_SIZE) - .setIfUnset(TARGET_QUERY_CONCURRENCY_ATTRIB, DEFAULT_TARGET_QUERY_CONCURRENCY) - .setIfUnset(MAX_QUERY_CONCURRENCY_ATTRIB, DEFAULT_MAX_QUERY_CONCURRENCY) .setIfUnset(DATE_FORMAT_ATTRIB, DEFAULT_DATE_FORMAT) .setIfUnset(STATS_UPDATE_FREQ_MS_ATTRIB, DEFAULT_STATS_UPDATE_FREQ_MS) .setIfUnset(CALL_QUEUE_ROUND_ROBIN_ATTRIB, DEFAULT_CALL_QUEUE_ROUND_ROBIN) .setIfUnset(MAX_MUTATION_SIZE_ATTRIB, DEFAULT_MAX_MUTATION_SIZE) - .setIfUnset(MAX_INTRA_REGION_PARALLELIZATION_ATTRIB, DEFAULT_MAX_INTRA_REGION_PARALLELIZATION) .setIfUnset(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, DEFAULT_ROW_KEY_ORDER_SALTED_TABLE) .setIfUnset(USE_INDEXES_ATTRIB, DEFAULT_USE_INDEXES) .setIfUnset(IMMUTABLE_ROWS_ATTRIB, DEFAULT_IMMUTABLE_ROWS) @@ -185,6 +181,7 @@ public class QueryServicesOptions { .setIfUnset(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES) .setIfUnset(SEQUENCE_CACHE_SIZE_ATTRIB, DEFAULT_SEQUENCE_CACHE_SIZE) .setIfUnset(SCAN_RESULT_CHUNK_SIZE, DEFAULT_SCAN_RESULT_CHUNK_SIZE) + .setIfUnset(HISTOGRAM_BYTE_DEPTH_ATTRIB, DEFAULT_HISTOGRAM_BYTE_DEPTH); ; // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set @@ -275,14 +272,6 @@ public class QueryServicesOptions { return set(SCAN_CACHE_SIZE_ATTRIB, scanFetchSize); } - public QueryServicesOptions setMaxQueryConcurrency(int maxQueryConcurrency) { - return set(MAX_QUERY_CONCURRENCY_ATTRIB, maxQueryConcurrency); - } - - public QueryServicesOptions setTargetQueryConcurrency(int targetQueryConcurrency) { - return set(TARGET_QUERY_CONCURRENCY_ATTRIB, targetQueryConcurrency); - } - public QueryServicesOptions setDateFormat(String dateFormat) { return set(DATE_FORMAT_ATTRIB, dateFormat); } @@ -291,6 +280,10 @@ public class QueryServicesOptions { return set(STATS_UPDATE_FREQ_MS_ATTRIB, frequencyMs); } + public QueryServicesOptions setHistogramDepthBytes(int depth) { + return set(HISTOGRAM_BYTE_DEPTH_ATTRIB, depth); + } + public QueryServicesOptions setCallQueueRoundRobin(boolean isRoundRobin) { return set(CALL_QUEUE_PRODUCER_ATTRIB_NAME, isRoundRobin); } @@ -303,10 +296,6 @@ public class QueryServicesOptions { return set(MUTATE_BATCH_SIZE_ATTRIB, mutateBatchSize); } - public QueryServicesOptions setMaxIntraRegionParallelization(int maxIntraRegionParallelization) { - return set(MAX_INTRA_REGION_PARALLELIZATION_ATTRIB, maxIntraRegionParallelization); - } - public QueryServicesOptions setRowKeyOrderSaltedTable(boolean rowKeyOrderSaltedTable) { return set(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, rowKeyOrderSaltedTable); } @@ -375,11 +364,7 @@ public class QueryServicesOptions { public int getMutateBatchSize() { return config.getInt(MUTATE_BATCH_SIZE_ATTRIB, DEFAULT_MUTATE_BATCH_SIZE); } - - public int getMaxIntraRegionParallelization() { - return config.getInt(MAX_INTRA_REGION_PARALLELIZATION_ATTRIB, DEFAULT_MAX_INTRA_REGION_PARALLELIZATION); - } - + public boolean isUseIndexes() { return config.getBoolean(USE_INDEXES_ATTRIB, DEFAULT_USE_INDEXES); } @@ -436,4 +421,7 @@ public class QueryServicesOptions { return set(WALEditCodec.WAL_EDIT_CODEC_CLASS_KEY, walEditCodec); } + public QueryServicesOptions setHistogramByteDepth(long byteDepth) { + return set(HISTOGRAM_BYTE_DEPTH_ATTRIB, byteDepth); + } }
