Repository: phoenix Updated Branches: refs/heads/master 534fa05da -> 5e33dc12b
PHOENIX-153 Implement TABLESAMPLE clause (Ethan Wang) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5e33dc12 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5e33dc12 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5e33dc12 Branch: refs/heads/master Commit: 5e33dc12bc088bd0008d89f0a5cd7d5c368efa25 Parents: 534fa05 Author: James Taylor <[email protected]> Authored: Tue Aug 1 11:52:24 2017 -0700 Committer: James Taylor <[email protected]> Committed: Tue Aug 1 11:52:24 2017 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/QueryWithTableSampleIT.java | 305 +++++++++++++++++++ phoenix-core/src/main/antlr3/PhoenixSQL.g | 7 +- .../apache/phoenix/compile/JoinCompiler.java | 24 +- .../phoenix/iterate/BaseResultIterators.java | 28 ++ .../phoenix/iterate/TableSamplerPredicate.java | 114 +++++++ .../apache/phoenix/optimize/QueryOptimizer.java | 2 +- .../apache/phoenix/parse/ConcreteTableNode.java | 23 ++ .../apache/phoenix/parse/DeleteStatement.java | 5 + .../phoenix/parse/FilterableStatement.java | 1 + .../apache/phoenix/parse/NamedTableNode.java | 21 +- .../apache/phoenix/parse/ParseNodeFactory.java | 26 +- .../apache/phoenix/parse/SelectStatement.java | 15 + 12 files changed, 554 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithTableSampleIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithTableSampleIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithTableSampleIT.java new file mode 100644 index 0000000..a7a7952 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithTableSampleIT.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Properties; + +import org.apache.phoenix.exception.PhoenixParserException; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.Before; +import org.junit.Test; + + +public class QueryWithTableSampleIT extends ParallelStatsEnabledIT { + private String tableName; + private String joinedTableName; + + @Before + public void generateTableNames() { + tableName = "T_" + generateUniqueName(); + joinedTableName = "T_" + generateUniqueName(); + } + + @Test(expected=PhoenixParserException.class) + public void testSingleQueryWrongSyntax() throws Exception { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + prepareTableWithValues(conn, 100); + String query = "SELECT i1, i2 FROM " + tableName +" tablesample 15 "; + + ResultSet rs = conn.createStatement().executeQuery(query); + } finally { + conn.close(); + } + } + + @Test(expected=PhoenixParserException.class) + public void testSingleQueryWrongSamplingRate() throws Exception { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + prepareTableWithValues(conn, 100); + String query = "SELECT i1, i2 FROM " + tableName +" tablesample (175) "; + + ResultSet rs = conn.createStatement().executeQuery(query); + } finally { + conn.close(); + } + } + + @Test + public void testSingleQueryZeroSamplingRate() throws Exception { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + prepareTableWithValues(conn, 100); + String query = "SELECT i1, i2 FROM " + tableName +" tablesample (0) "; + ResultSet rs = conn.createStatement().executeQuery(query); + assertFalse(rs.next()); + } finally { + conn.close(); + } + } + + @Test + public void testSingleQuery() throws Exception { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + prepareTableWithValues(conn, 100); + String query = "SELECT i1, i2 FROM " + tableName +" tablesample (45) "; + ResultSet rs = conn.createStatement().executeQuery(query); + + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals(200, rs.getInt(2)); + + assertTrue(rs.next()); + assertEquals(6, rs.getInt(1)); + assertEquals(600, rs.getInt(2)); + } finally { + conn.close(); + } + } + + @Test + public void testSingleQueryWithWhereClause() throws Exception { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + prepareTableWithValues(conn, 100); + String query = "SELECT i1, i2 FROM " + tableName +" tablesample (22) where i2>=300 and i1<14 LIMIT 4 "; + ResultSet rs = conn.createStatement().executeQuery(query); + + assertTrue(rs.next()); + assertEquals(8, rs.getInt(1)); + + assertTrue(rs.next()); + assertEquals(10, rs.getInt(1)); + + assertTrue(rs.next()); + assertEquals(12, rs.getInt(1)); + } finally { + conn.close(); + } + } + + @Test + public void testSingleQueryWithAggregator() throws Exception { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + prepareTableWithValues(conn, 100); + String query = "SELECT count(i1) FROM " + tableName +" tablesample (22) where i2>=3000 or i1<2 "; + ResultSet rs = conn.createStatement().executeQuery(query); + + assertTrue(rs.next()); + assertEquals(14, rs.getInt(1)); + } finally { + conn.close(); + } + } + + @Test + public void testSingleQueryWithUnion() throws Exception { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + prepareTableWithValues(conn, 100); + String query = "SELECT * FROM " + tableName +" tablesample (100) where i1<2 union all SELECT * FROM " + tableName +" tablesample (2) where i2<6000"; + ResultSet rs = conn.createStatement().executeQuery(query); + + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(30, rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(44, rs.getInt(1)); + } finally { + conn.close(); + } + } + + @Test + public void testSingleQueryWithSubQuery() throws Exception { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + prepareTableWithValues(conn, 100); + String query = "SELECT count(*) FROM (SELECT * FROM " + tableName +" tablesample (50))"; + ResultSet rs = conn.createStatement().executeQuery(query); + + assertTrue(rs.next()); + assertEquals(50, rs.getInt(1)); + } finally { + conn.close(); + } + } + + @Test + public void testSingleQueryWithJoins() throws Exception { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + prepareTableWithValues(conn, 100); + String query = "SELECT count(*) FROM " + tableName +" as A tablesample (45), "+joinedTableName+" as B tablesample (75) where A.i1=B.i1"; + ResultSet rs = conn.createStatement().executeQuery(query); + + assertTrue(rs.next()); + assertEquals(31, rs.getInt(1)); + } finally { + conn.close(); + } + } + + @Test + public void testExplainSingleQuery() throws Exception { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + prepareTableWithValues(conn, 100); + String query = "EXPLAIN SELECT i1, i2 FROM " + tableName +" tablesample (45) "; + ResultSet rs = conn.createStatement().executeQuery(query); + assertEquals("CLIENT PARALLEL 1-WAY 0.45-SAMPLED FULL SCAN OVER " + tableName + "\n" + + " SERVER FILTER BY FIRST KEY ONLY",QueryUtil.getExplainPlan(rs)); + } finally { + conn.close(); + } + } + + @Test + public void testExplainSingleQueryWithUnion() throws Exception { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + prepareTableWithValues(conn, 100); + String query = "EXPLAIN SELECT * FROM " + tableName +" tablesample (100) where i1<2 union all SELECT * FROM " + tableName +" tablesample (2) where i2<6000"; + ResultSet rs = conn.createStatement().executeQuery(query); + + assertEquals("UNION ALL OVER 2 QUERIES\n" + + " CLIENT PARALLEL 1-WAY 1.0-SAMPLED RANGE SCAN OVER " + tableName+" [*] - [2]\n"+ + " SERVER FILTER BY FIRST KEY ONLY\n" + + " CLIENT PARALLEL 1-WAY 0.02-SAMPLED FULL SCAN OVER " + tableName + "\n" + + " SERVER FILTER BY FIRST KEY ONLY AND I2 < 6000",QueryUtil.getExplainPlan(rs)); + } finally { + conn.close(); + } + } + + @Test + public void testExplainSingleQueryWithJoins() throws Exception { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + prepareTableWithValues(conn, 100); + String query = "EXPLAIN SELECT count(*) FROM " + tableName +" as A tablesample (45), "+joinedTableName+" as B tablesample (75) where A.i1=B.i1"; + System.out.println(query); + ResultSet rs = conn.createStatement().executeQuery(query); + + assertEquals("CLIENT PARALLEL 1-WAY 0.45-SAMPLED FULL SCAN OVER " + tableName + "\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + " SERVER AGGREGATE INTO SINGLE ROW\n" + + " PARALLEL INNER-JOIN TABLE 0 (SKIP MERGE)\n" + + " CLIENT PARALLEL 1-WAY 0.75-SAMPLED FULL SCAN OVER " + joinedTableName + "\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + " DYNAMIC SERVER FILTER BY A.I1 IN (B.I1)",QueryUtil.getExplainPlan(rs)); + } finally { + conn.close(); + } + } + + /** + * Prepare tables with stats updated. format of first table such as + * i1, i2 + * 1, 100 + * 2, 200 + * 3, 300 + * ... + * + * @param conn + * @param nRows + * @throws Exception + */ + final private void prepareTableWithValues(final Connection conn, final int nRows) throws Exception { + conn.createStatement().execute("create table " + tableName + "\n" + + " (i1 integer not null, i2 integer not null\n" + + " CONSTRAINT pk PRIMARY KEY (i1,i2))"); + + final PreparedStatement stmt = conn.prepareStatement( + "upsert into " + tableName + + " VALUES (?, ?)"); + for (int i = 0; i < nRows; i++) { + stmt.setInt(1, i); + stmt.setInt(2, i*100); + stmt.execute(); + } + conn.commit(); + + conn.createStatement().execute("UPDATE STATISTICS " + tableName); + + //Prepare for second table + conn.createStatement().execute("create table " + joinedTableName + "\n" + + " (i1 integer not null, i2 integer not null\n" + + " CONSTRAINT pk PRIMARY KEY (i1,i2))"); + + final PreparedStatement stmt2 = conn.prepareStatement( + "upsert into " + joinedTableName + + " VALUES (?, ?)"); + for (int i = 0; i < nRows; i++) { + stmt2.setInt(1, i); + stmt2.setInt(2, i*-100); + stmt2.execute(); + } + conn.commit(); + conn.createStatement().execute("UPDATE STATISTICS " + joinedTableName); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/antlr3/PhoenixSQL.g ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g index 66d50e9..721b514 100644 --- a/phoenix-core/src/main/antlr3/PhoenixSQL.g +++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g @@ -114,6 +114,7 @@ tokens TRACE='trace'; ASYNC='async'; SAMPLING='sampling'; + TABLESAMPLE='tablesample'; UNION='union'; FUNCTION='function'; AS='as'; @@ -794,6 +795,10 @@ sampling_rate returns [LiteralParseNode ret] : l=literal { $ret = l; } ; +tableSampleNode returns [LiteralParseNode ret] + : l=literal { $ret = l; } + ; + hintClause returns [HintNode ret] : c=ML_HINT { $ret = factory.hint(c.getText()); } ; @@ -851,7 +856,7 @@ table_ref returns [TableNode ret] table_factor returns [TableNode ret] : LPAREN t=table_list RPAREN { $ret = t; } | n=bind_name ((AS)? alias=identifier)? { $ret = factory.bindTable(alias, factory.table(null,n)); } // TODO: review - | f=from_table_name ((AS)? alias=identifier)? (LPAREN cdefs=dyn_column_defs RPAREN)? { $ret = factory.namedTable(alias,f,cdefs); } + | f=from_table_name ((AS)? alias=identifier)? (LPAREN cdefs=dyn_column_defs RPAREN)? (TABLESAMPLE LPAREN tableSample=tableSampleNode RPAREN)? { $ret = factory.namedTable(alias,f,cdefs, tableSample);} | LPAREN s=select_node RPAREN ((AS)? alias=identifier)? { $ret = factory.derivedTable(alias, s); } ; http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java index 49de75b..887e2d2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java @@ -50,6 +50,7 @@ import org.apache.phoenix.parse.BooleanParseNodeVisitor; import org.apache.phoenix.parse.ColumnDef; import org.apache.phoenix.parse.ColumnParseNode; import org.apache.phoenix.parse.ComparisonParseNode; +import org.apache.phoenix.parse.ConcreteTableNode; import org.apache.phoenix.parse.DerivedTableNode; import org.apache.phoenix.parse.EqualParseNode; import org.apache.phoenix.parse.HintNode; @@ -185,7 +186,7 @@ public class JoinCompiler { public Pair<Table, List<JoinSpec>> visit(BindTableNode boundTableNode) throws SQLException { TableRef tableRef = resolveTable(boundTableNode.getAlias(), boundTableNode.getName()); List<AliasedNode> selectNodes = extractFromSelect(select.getSelect(), tableRef, origResolver); - Table table = new Table(boundTableNode, Collections.<ColumnDef>emptyList(), selectNodes, tableRef); + Table table = new Table(boundTableNode, Collections.<ColumnDef>emptyList(), boundTableNode.getTableSamplingRate(), selectNodes, tableRef); return new Pair<Table, List<JoinSpec>>(table, null); } @@ -208,7 +209,7 @@ public class JoinCompiler { throws SQLException { TableRef tableRef = resolveTable(namedTableNode.getAlias(), namedTableNode.getName()); List<AliasedNode> selectNodes = extractFromSelect(select.getSelect(), tableRef, origResolver); - Table table = new Table(namedTableNode, namedTableNode.getDynamicColumns(), selectNodes, tableRef); + Table table = new Table(namedTableNode, namedTableNode.getDynamicColumns(), namedTableNode.getTableSamplingRate(), selectNodes, tableRef); return new Pair<Table, List<JoinSpec>>(table, null); } @@ -614,6 +615,7 @@ public class JoinCompiler { public class Table { private final TableNode tableNode; private final List<ColumnDef> dynamicColumns; + private final Double tableSamplingRate; private final SelectStatement subselect; private final TableRef tableRef; private final List<AliasedNode> selectNodes; // all basic nodes related to this table, no aggregation. @@ -621,10 +623,11 @@ public class JoinCompiler { private final List<ParseNode> postFilters; private final boolean isPostFilterConvertible; - private Table(TableNode tableNode, List<ColumnDef> dynamicColumns, + private Table(TableNode tableNode, List<ColumnDef> dynamicColumns, Double tableSamplingRate, List<AliasedNode> selectNodes, TableRef tableRef) { this.tableNode = tableNode; this.dynamicColumns = dynamicColumns; + this.tableSamplingRate=tableSamplingRate; this.subselect = null; this.tableRef = tableRef; this.selectNodes = selectNodes; @@ -637,6 +640,7 @@ public class JoinCompiler { List<AliasedNode> selectNodes, TableRef tableRef) throws SQLException { this.tableNode = tableNode; this.dynamicColumns = Collections.<ColumnDef>emptyList(); + this.tableSamplingRate=ConcreteTableNode.DEFAULT_TABLE_SAMPLING_RATE; this.subselect = SubselectRewriter.flatten(tableNode.getSelect(), statement.getConnection()); this.tableRef = tableRef; this.selectNodes = selectNodes; @@ -652,6 +656,10 @@ public class JoinCompiler { public List<ColumnDef> getDynamicColumns() { return dynamicColumns; } + + public Double getTableSamplingRate() { + return tableSamplingRate; + } public boolean isSubselect() { return subselect != null; @@ -1188,7 +1196,7 @@ public class JoinCompiler { TableRef tableRef = table.getTableRef(); List<ParseNode> groupBy = tableRef.equals(groupByTableRef) ? select.getGroupBy() : null; List<OrderByNode> orderBy = tableRef.equals(orderByTableRef) ? select.getOrderBy() : null; - SelectStatement stmt = getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), tableRef, join.getColumnRefs(), table.getPreFiltersCombined(), groupBy, orderBy, table.isWildCardSelect(), select.hasSequence(), select.getUdfParseNodes()); + SelectStatement stmt = getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), table.getTableSamplingRate(), tableRef, join.getColumnRefs(), table.getPreFiltersCombined(), groupBy, orderBy, table.isWildCardSelect(), select.hasSequence(), select.getUdfParseNodes()); QueryPlan plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, stmt); if (!plan.getTableRef().equals(tableRef)) { replacement.put(tableRef, plan.getTableRef()); @@ -1244,7 +1252,7 @@ public class JoinCompiler { return namedTableNode; String alias = namedTableNode.getAlias(); - return NODE_FACTORY.namedTable(alias == null ? null : '"' + alias + '"', getReplacedTableName(replaceRef), namedTableNode.getDynamicColumns()); + return NODE_FACTORY.namedTable(alias == null ? null : '"' + alias + '"', getReplacedTableName(replaceRef), namedTableNode.getDynamicColumns(), namedTableNode.getTableSamplingRate()); } @Override @@ -1258,11 +1266,11 @@ public class JoinCompiler { for ( TableRef indexTableRef : replacement.values()) { // replace expressions with corresponding matching columns for functional indexes indexSelect = ParseNodeRewriter.rewrite(indexSelect, new IndexExpressionParseNodeRewriter(indexTableRef.getTable(), indexTableRef.getTableAlias(), statement.getConnection(), indexSelect.getUdfParseNodes())); - } + } return indexSelect; } - private static SelectStatement getSubqueryForOptimizedPlan(HintNode hintNode, List<ColumnDef> dynamicCols, TableRef tableRef, Map<ColumnRef, ColumnRefType> columnRefs, ParseNode where, List<ParseNode> groupBy, + private static SelectStatement getSubqueryForOptimizedPlan(HintNode hintNode, List<ColumnDef> dynamicCols, Double tableSamplingRate, TableRef tableRef, Map<ColumnRef, ColumnRefType> columnRefs, ParseNode where, List<ParseNode> groupBy, List<OrderByNode> orderBy, boolean isWildCardSelect, boolean hasSequence, Map<String, UDFParseNode> udfParseNodes) { String schemaName = tableRef.getTable().getSchemaName().getString(); TableName tName = TableName.create(schemaName.length() == 0 ? null : schemaName, tableRef.getTable().getTableName().getString()); @@ -1281,7 +1289,7 @@ public class JoinCompiler { } } String tableAlias = tableRef.getTableAlias(); - TableNode from = NODE_FACTORY.namedTable(tableAlias == null ? null : '"' + tableAlias + '"', tName, dynamicCols); + TableNode from = NODE_FACTORY.namedTable(tableAlias == null ? null : '"' + tableAlias + '"', tName, dynamicCols,tableSamplingRate); return NODE_FACTORY.select(from, hintNode, false, selectList, where, groupBy, null, orderBy, null, null, 0, groupBy != null, hasSequence, Collections.<SelectStatement> emptyList(), udfParseNodes); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 8d6c107..f1db46c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -113,6 +113,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Function; +import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -782,9 +783,32 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } finally { if (stream != null) Closeables.closeQuietly(stream); } + + sampleScans(parallelScans,this.plan.getStatement().getTableSamplingRate()); return parallelScans; } + /** + * Loop through List<List<Scan>> parallelScans object, + * rolling dice on each scan based on startRowKey. + * + * All FilterableStatement should have tableSamplingRate. + * In case it is delete statement, an unsupported message is raised. + * In case it is null tableSamplingRate, 100% sampling rate will be applied by default. + * + * @param parallelScans + */ + private void sampleScans(final List<List<Scan>> parallelScans, final Double tableSamplingRate){ + if(tableSamplingRate==null||tableSamplingRate==100d) return; + final Predicate<byte[]> tableSamplerPredicate=TableSamplerPredicate.of(tableSamplingRate); + + for(Iterator<List<Scan>> is = parallelScans.iterator();is.hasNext();){ + for(Iterator<Scan> i=is.next().iterator();i.hasNext();){ + final Scan scan=i.next(); + if(!tableSamplerPredicate.apply(scan.getStartRow())){i.remove();} + } + } + } public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) { if (!reverse) { @@ -1070,6 +1094,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } } buf.append(getName()).append(" ").append(size()).append("-WAY "); + + if(this.plan.getStatement().getTableSamplingRate()!=null){ + buf.append(plan.getStatement().getTableSamplingRate()/100D).append("-").append("SAMPLED "); + } try { if (plan.useRoundRobinIterator()) { buf.append("ROUND ROBIN "); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSamplerPredicate.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSamplerPredicate.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSamplerPredicate.java new file mode 100644 index 0000000..7ee0b6b --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSamplerPredicate.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import com.google.common.base.Predicate; + +/** + * TableSampler. + * + * A dice rolling on every targeted row to decide if this row is going + * to be picked or not. + * An application is table Sampler, based on boolean result, this row is + * then picked (or rejected) to be part of sample set. + * + * Currently implemented using FNV1a with Lazy mod mapping method to ensure + * the even distribution of hashed result, so that the final sampled result + * will be close to the size of expected + * + */ +public class TableSamplerPredicate implements Predicate<byte[]>{ + private final double tableSamplingRate; + + private TableSamplerPredicate(double tableSamplingRate){ + this.tableSamplingRate=tableSamplingRate; + } + + public static TableSamplerPredicate of(final Double tableSamplingRateRaw){ + assert(tableSamplingRateRaw!=null):"tableSamplingRate can not be null"; + assert(tableSamplingRateRaw>=0d&&tableSamplingRateRaw<=100d):"tableSamplingRate input has to be a rational number between 0 and 100"; + TableSamplerPredicate self=new TableSamplerPredicate(tableSamplingRateRaw); + return self; + } + + @Override + public boolean apply(byte[] bytes) { + final int hashcode_FNV1Lazy=FNV1LazyImpl(bytes); + final boolean result=evaluateWithChance(hashcode_FNV1Lazy); + return result; + } + + /** + * Take build in FNV1a Hash function then apply lazy mod mapping method so that the + * hash is evenly distributed between 0 and 100. + * + * Quoted from http://isthe.com/chongo/tech/comp/fnv/, + * The FNV hash is designed for hash sizes that are a power of 2. + * If you need a hash size that is not a power of two, then you have two choices. + * One method is called the lazy mod mapping method and the other is called the retry method. + * Both involve mapping a range that is a power of 2 onto an arbitrary range. + * + * Lazy mod mapping method: The lazy mod mapping method uses a simple mod on an n-bit hash + * to yield an arbitrary range. + * To produce a hash range between 0 and X use a n-bit FNV hash where n is smallest FNV hash + * that will produce values larger than X without the need for xor-folding. + * + * For example, to produce a value between 0 and 2142779559 using the lazy mod mapping method, + * we select a 32-bit FNV hash because: 2 power 32 > 49999 + * Before the final mod 50000 is performed, + * we check to see if the 32-bit FNV hash value is one of the upper biased values. + * If it is, we perform additional loop cycles until is below the bias level. + * + * An advantage of the lazy mod mapping method is that it requires only 1 more operation: + * only an additional mod is performed at the end. + * The disadvantage of the lazy mod mapping method is that there is a bias against + * the larger values. + * + * @param bytes + * @return + */ + final static private int FNV1LazyImpl(final byte[] bytes){ + final int contentBasedHashCode = java.util.Arrays.hashCode(bytes); + return lazyRedistribute(contentBasedHashCode); + } + + + /** + * Lazy mod mapping method Implementation + * + * Output result should be following the same distribution as input hashcode, + * however re-mapped between 0 and 100. + * + * @param hashcode + * @return + */ + final static private int lazyRedistribute(final int hashcode){ + return java.lang.Math.abs(hashcode%100); + } + + /** + * + * @param hashcode + * @return + */ + final private boolean evaluateWithChance(final int hashcode){ + assert((hashcode>=0)&&(hashcode<=100)):"hashcode should be re-distribute into 0 to 100"; + return (hashcode<tableSamplingRate)?true:false; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java index 14b7945..2bfc5fb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java @@ -221,7 +221,7 @@ public class QueryOptimizer { schemaName = schemaName.length() == 0 ? null : '"' + schemaName + '"'; String tableName = '"' + index.getTableName().getString() + '"'; - TableNode table = FACTORY.namedTable(alias, FACTORY.table(schemaName, tableName)); + TableNode table = FACTORY.namedTable(alias, FACTORY.table(schemaName, tableName),select.getTableSamplingRate()); SelectStatement indexSelect = FACTORY.select(select, table); ColumnResolver resolver = FromCompiler.getResolverForQuery(indexSelect, statement.getConnection()); // We will or will not do tuple projection according to the data plan. http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/parse/ConcreteTableNode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ConcreteTableNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ConcreteTableNode.java index 640ee7b..6a4ed6f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ConcreteTableNode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ConcreteTableNode.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.parse; +import org.apache.commons.math3.exception.OutOfRangeException; import org.apache.phoenix.util.SchemaUtil; /** @@ -27,22 +28,41 @@ import org.apache.phoenix.util.SchemaUtil; * @since 0.1 */ public abstract class ConcreteTableNode extends TableNode { + //DEFAULT_TABLE_SAMPLING_RATE alternative is to set as 100d + public static final Double DEFAULT_TABLE_SAMPLING_RATE=null; private final TableName name; + private final Double tableSamplingRate; ConcreteTableNode(String alias, TableName name) { + this(alias,name,DEFAULT_TABLE_SAMPLING_RATE); + } + + ConcreteTableNode(String alias, TableName name, Double tableSamplingRate) { super(SchemaUtil.normalizeIdentifier(alias)); this.name = name; + if(tableSamplingRate==null){ + this.tableSamplingRate=DEFAULT_TABLE_SAMPLING_RATE; + }else if(tableSamplingRate<0d||tableSamplingRate>100d){ + throw new OutOfRangeException(tableSamplingRate, 0d, 100d); + }else{ + this.tableSamplingRate=tableSamplingRate; + } } public TableName getName() { return name; } + + public Double getTableSamplingRate(){ + return tableSamplingRate; + } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((name == null) ? 0 : name.hashCode()); + result = prime * result + ((tableSamplingRate == null) ? 0 : tableSamplingRate.hashCode()); return result; } @@ -55,6 +75,9 @@ public abstract class ConcreteTableNode extends TableNode { if (name == null) { if (other.name != null) return false; } else if (!name.equals(other.name)) return false; + if (tableSamplingRate == null) { + if (other.tableSamplingRate != null) return false; + } else if (!tableSamplingRate.equals(other.tableSamplingRate)) return false; return true; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java index fb2d327..331bee4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java @@ -76,5 +76,10 @@ public class DeleteStatement extends DMLStatement implements FilterableStatement public OffsetNode getOffset() { return null; } + + @Override + public Double getTableSamplingRate(){ + throw new UnsupportedOperationException("Table sampling is not allowd for Deletion"); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/parse/FilterableStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/FilterableStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/FilterableStatement.java index ad54d98..62a4aa2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/FilterableStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/FilterableStatement.java @@ -25,6 +25,7 @@ public interface FilterableStatement extends BindableStatement { public boolean isDistinct(); public boolean isAggregate(); public List<OrderByNode> getOrderBy(); + public Double getTableSamplingRate(); public LimitNode getLimit(); public OffsetNode getOffset(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java index d3b4505..7944a04 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java @@ -34,7 +34,7 @@ import com.google.common.collect.ImmutableList; public class NamedTableNode extends ConcreteTableNode { private final List<ColumnDef> dynColumns; - + public static NamedTableNode create (String alias, TableName name, List<ColumnDef> dynColumns) { return new NamedTableNode(alias, name, dynColumns); } @@ -47,13 +47,23 @@ public class NamedTableNode extends ConcreteTableNode { return new NamedTableNode(null, TableName.create(schemaName, tableName), Collections.<ColumnDef>emptyList()); } + @Deprecated NamedTableNode(String alias, TableName name) { - super(alias, name); - dynColumns = Collections.<ColumnDef> emptyList(); + this(alias, name, ConcreteTableNode.DEFAULT_TABLE_SAMPLING_RATE); } - + + @Deprecated NamedTableNode(String alias, TableName name, List<ColumnDef> dynColumns) { - super(alias, name); + this(alias,name,dynColumns,ConcreteTableNode.DEFAULT_TABLE_SAMPLING_RATE); + } + + NamedTableNode(String alias, TableName name, Double tableSamplingRate) { + super(alias, name, tableSamplingRate); + dynColumns = Collections.<ColumnDef> emptyList(); + } + + NamedTableNode(String alias, TableName name, List<ColumnDef> dynColumns, Double tableSamplingRate) { + super(alias, name, tableSamplingRate); if (dynColumns != null) { this.dynColumns = ImmutableList.copyOf(dynColumns); } else { @@ -83,6 +93,7 @@ public class NamedTableNode extends ConcreteTableNode { buf.append(')'); } if (this.getAlias() != null) buf.append(" " + this.getAlias()); + if (this.getTableSamplingRate() != null) buf.append(" TABLESAMPLE " + this.getTableSamplingRate()); buf.append(' '); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java index 4628d51..0058f38 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java @@ -398,13 +398,35 @@ public class ParseNodeFactory { return new NamedNode(name); } + @Deprecated public NamedTableNode namedTable(String alias, TableName name) { return new NamedTableNode(alias, name); } - - public NamedTableNode namedTable(String alias, TableName name ,List<ColumnDef> dyn_columns) { + + @Deprecated + public NamedTableNode namedTable(String alias, TableName name, List<ColumnDef> dyn_columns) { return new NamedTableNode(alias, name,dyn_columns); } + + public NamedTableNode namedTable(String alias, TableName name, Double tableSamplingRate) { + return new NamedTableNode(alias, name, tableSamplingRate); + } + + public NamedTableNode namedTable(String alias, TableName name, List<ColumnDef> dyn_columns, Double tableSamplingRate) { + return new NamedTableNode(alias, name,dyn_columns, tableSamplingRate); + } + + public NamedTableNode namedTable(String alias, TableName name, List<ColumnDef> dyn_columns, LiteralParseNode tableSampleNode) { + Double tableSamplingRate; + if(tableSampleNode==null||tableSampleNode.getValue()==null){ + tableSamplingRate=ConcreteTableNode.DEFAULT_TABLE_SAMPLING_RATE; + }else if(tableSampleNode.getValue() instanceof Integer){ + tableSamplingRate=(double)((int)tableSampleNode.getValue()); + }else{ + tableSamplingRate=((BigDecimal) tableSampleNode.getValue()).doubleValue(); + } + return new NamedTableNode(alias, name, dyn_columns, tableSamplingRate); + } public BindTableNode bindTable(String alias, TableName name) { return new BindTableNode(alias, name); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java index a7c1a0b..d4f079b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java @@ -266,6 +266,21 @@ public class SelectStatement implements FilterableStatement { return limit; } + /** + * This method should not be called during the early stage + * of the plan preparation phase since fromTable might not + * be ConcreteTableNode at that time(e.g., JoinTableNode). + * + * By the time getTableSamplingRate method is called, + * each select statements should have exactly one ConcreteTableNode, + * with its corresponding sampling rate associate with it. + */ + @Override + public Double getTableSamplingRate(){ + if(fromTable==null || !(fromTable instanceof ConcreteTableNode)) return null; + return ((ConcreteTableNode)fromTable).getTableSamplingRate(); + } + @Override public int getBindCount() { return bindCount;
