Repository: phoenix
Updated Branches:
  refs/heads/master 534fa05da -> 5e33dc12b


PHOENIX-153 Implement TABLESAMPLE clause (Ethan Wang)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5e33dc12
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5e33dc12
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5e33dc12

Branch: refs/heads/master
Commit: 5e33dc12bc088bd0008d89f0a5cd7d5c368efa25
Parents: 534fa05
Author: James Taylor <[email protected]>
Authored: Tue Aug 1 11:52:24 2017 -0700
Committer: James Taylor <[email protected]>
Committed: Tue Aug 1 11:52:24 2017 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/QueryWithTableSampleIT.java | 305 +++++++++++++++++++
 phoenix-core/src/main/antlr3/PhoenixSQL.g       |   7 +-
 .../apache/phoenix/compile/JoinCompiler.java    |  24 +-
 .../phoenix/iterate/BaseResultIterators.java    |  28 ++
 .../phoenix/iterate/TableSamplerPredicate.java  | 114 +++++++
 .../apache/phoenix/optimize/QueryOptimizer.java |   2 +-
 .../apache/phoenix/parse/ConcreteTableNode.java |  23 ++
 .../apache/phoenix/parse/DeleteStatement.java   |   5 +
 .../phoenix/parse/FilterableStatement.java      |   1 +
 .../apache/phoenix/parse/NamedTableNode.java    |  21 +-
 .../apache/phoenix/parse/ParseNodeFactory.java  |  26 +-
 .../apache/phoenix/parse/SelectStatement.java   |  15 +
 12 files changed, 554 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithTableSampleIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithTableSampleIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithTableSampleIT.java
new file mode 100644
index 0000000..a7a7952
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithTableSampleIT.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.apache.phoenix.exception.PhoenixParserException;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class QueryWithTableSampleIT extends ParallelStatsEnabledIT {
+    private String tableName;
+    private String joinedTableName;
+    
+    @Before
+    public void generateTableNames() {
+        tableName = "T_" + generateUniqueName();
+        joinedTableName = "T_" + generateUniqueName();
+    }
+        
+    @Test(expected=PhoenixParserException.class)
+    public void testSingleQueryWrongSyntax() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            prepareTableWithValues(conn, 100);
+            String query = "SELECT i1, i2 FROM " + tableName +" tablesample 15 
";
+
+            ResultSet rs = conn.createStatement().executeQuery(query);
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test(expected=PhoenixParserException.class)
+    public void testSingleQueryWrongSamplingRate() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            prepareTableWithValues(conn, 100);
+            String query = "SELECT i1, i2 FROM " + tableName +" tablesample 
(175) ";
+
+            ResultSet rs = conn.createStatement().executeQuery(query);
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testSingleQueryZeroSamplingRate() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            prepareTableWithValues(conn, 100);
+            String query = "SELECT i1, i2 FROM " + tableName +" tablesample 
(0) ";
+            ResultSet rs = conn.createStatement().executeQuery(query);         
               
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testSingleQuery() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            prepareTableWithValues(conn, 100);
+            String query = "SELECT i1, i2 FROM " + tableName +" tablesample 
(45) ";
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            assertEquals(200, rs.getInt(2));
+            
+            assertTrue(rs.next());
+            assertEquals(6, rs.getInt(1));
+            assertEquals(600, rs.getInt(2));
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testSingleQueryWithWhereClause() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            prepareTableWithValues(conn, 100);
+            String query = "SELECT i1, i2 FROM " + tableName +" tablesample 
(22) where i2>=300 and i1<14 LIMIT 4 ";
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            
+            assertTrue(rs.next());
+            assertEquals(8, rs.getInt(1));
+            
+            assertTrue(rs.next());
+            assertEquals(10, rs.getInt(1));
+            
+            assertTrue(rs.next());
+            assertEquals(12, rs.getInt(1));
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testSingleQueryWithAggregator() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            prepareTableWithValues(conn, 100);
+            String query = "SELECT count(i1) FROM " + tableName +" tablesample 
(22) where i2>=3000 or i1<2 ";
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            
+            assertTrue(rs.next());
+            assertEquals(14, rs.getInt(1));
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testSingleQueryWithUnion() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            prepareTableWithValues(conn, 100);
+            String query = "SELECT * FROM " + tableName +" tablesample (100) 
where i1<2 union all SELECT * FROM " + tableName +" tablesample (2) where 
i2<6000";
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            
+            assertTrue(rs.next());
+            assertEquals(0, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(30, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(44, rs.getInt(1));
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testSingleQueryWithSubQuery() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            prepareTableWithValues(conn, 100);
+            String query = "SELECT count(*) FROM (SELECT * FROM " + tableName 
+" tablesample (50))";
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            
+            assertTrue(rs.next());
+            assertEquals(50, rs.getInt(1));
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testSingleQueryWithJoins() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            prepareTableWithValues(conn, 100);
+            String query = "SELECT count(*) FROM " + tableName +" as A 
tablesample (45), "+joinedTableName+" as B tablesample (75) where A.i1=B.i1";
+            ResultSet rs = conn.createStatement().executeQuery(query);
+
+            assertTrue(rs.next());
+            assertEquals(31, rs.getInt(1));
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testExplainSingleQuery() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            prepareTableWithValues(conn, 100);
+            String query = "EXPLAIN SELECT i1, i2 FROM " + tableName +" 
tablesample (45) ";
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            assertEquals("CLIENT PARALLEL 1-WAY 0.45-SAMPLED FULL SCAN OVER " 
+ tableName + "\n" +
+                               "    SERVER FILTER BY FIRST KEY 
ONLY",QueryUtil.getExplainPlan(rs));
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testExplainSingleQueryWithUnion() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            prepareTableWithValues(conn, 100);
+            String query = "EXPLAIN SELECT * FROM " + tableName +" tablesample 
(100) where i1<2 union all SELECT * FROM " + tableName +" tablesample (2) where 
i2<6000";
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            
+            assertEquals("UNION ALL OVER 2 QUERIES\n" +
+                                               "    CLIENT PARALLEL 1-WAY 
1.0-SAMPLED RANGE SCAN OVER " + tableName+" [*] - [2]\n"+
+                                               "        SERVER FILTER BY FIRST 
KEY ONLY\n" +
+                                               "    CLIENT PARALLEL 1-WAY 
0.02-SAMPLED FULL SCAN OVER " + tableName + "\n" +
+                                               "        SERVER FILTER BY FIRST 
KEY ONLY AND I2 < 6000",QueryUtil.getExplainPlan(rs));
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testExplainSingleQueryWithJoins() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            prepareTableWithValues(conn, 100);
+            String query = "EXPLAIN SELECT count(*) FROM " + tableName +" as A 
tablesample (45), "+joinedTableName+" as B tablesample (75) where A.i1=B.i1";
+            System.out.println(query);
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            
+            assertEquals("CLIENT PARALLEL 1-WAY 0.45-SAMPLED FULL SCAN OVER " 
+ tableName + "\n" +
+                                               "    SERVER FILTER BY FIRST KEY 
ONLY\n" +
+                                               "    SERVER AGGREGATE INTO 
SINGLE ROW\n" +
+                                               "    PARALLEL INNER-JOIN TABLE 
0 (SKIP MERGE)\n" +
+                                               "        CLIENT PARALLEL 1-WAY 
0.75-SAMPLED FULL SCAN OVER " + joinedTableName + "\n" +
+                                               "            SERVER FILTER BY 
FIRST KEY ONLY\n" +
+                                               "    DYNAMIC SERVER FILTER BY 
A.I1 IN (B.I1)",QueryUtil.getExplainPlan(rs));
+        } finally {
+            conn.close();
+        }
+    }
+    
+    /**
+     * Prepare tables with stats updated. format of first table such as
+     *  i1, i2
+     *   1, 100
+     *   2, 200
+     *   3, 300
+     *   ...
+     *   
+     * @param conn
+     * @param nRows
+     * @throws Exception
+     */
+    final private void prepareTableWithValues(final Connection conn, final int 
nRows) throws Exception {
+        conn.createStatement().execute("create table " + tableName + "\n" +
+                "   (i1 integer not null, i2 integer not null\n" +
+                "    CONSTRAINT pk PRIMARY KEY (i1,i2))");
+        
+        final PreparedStatement stmt = conn.prepareStatement(
+            "upsert into " + tableName + 
+            " VALUES (?, ?)");
+        for (int i = 0; i < nRows; i++) {
+            stmt.setInt(1, i);
+            stmt.setInt(2, i*100);
+            stmt.execute();
+        }
+        conn.commit();
+        
+        conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+        
+        //Prepare for second table
+        conn.createStatement().execute("create table " + joinedTableName + 
"\n" +
+                "   (i1 integer not null, i2 integer not null\n" +
+                "    CONSTRAINT pk PRIMARY KEY (i1,i2))");
+        
+        final PreparedStatement stmt2 = conn.prepareStatement(
+            "upsert into " + joinedTableName + 
+            " VALUES (?, ?)");
+        for (int i = 0; i < nRows; i++) {
+            stmt2.setInt(1, i);
+            stmt2.setInt(2, i*-100);
+            stmt2.execute();
+        }
+        conn.commit();
+        conn.createStatement().execute("UPDATE STATISTICS " + joinedTableName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g 
b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 66d50e9..721b514 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -114,6 +114,7 @@ tokens
     TRACE='trace';
     ASYNC='async';
     SAMPLING='sampling';
+    TABLESAMPLE='tablesample';
     UNION='union';
     FUNCTION='function';
     AS='as';
@@ -794,6 +795,10 @@ sampling_rate returns [LiteralParseNode ret]
     : l=literal { $ret = l; }
     ;
 
+tableSampleNode returns [LiteralParseNode ret]
+    : l=literal { $ret = l; }
+    ;
+
 hintClause returns [HintNode ret]
     :  c=ML_HINT { $ret = factory.hint(c.getText()); }
     ;
@@ -851,7 +856,7 @@ table_ref returns [TableNode ret]
 table_factor returns [TableNode ret]
     :   LPAREN t=table_list RPAREN { $ret = t; }
     |   n=bind_name ((AS)? alias=identifier)? { $ret = 
factory.bindTable(alias, factory.table(null,n)); } // TODO: review
-    |   f=from_table_name ((AS)? alias=identifier)? (LPAREN 
cdefs=dyn_column_defs RPAREN)? { $ret = factory.namedTable(alias,f,cdefs); }
+    |   f=from_table_name ((AS)? alias=identifier)? (LPAREN 
cdefs=dyn_column_defs RPAREN)? (TABLESAMPLE LPAREN tableSample=tableSampleNode 
RPAREN)? { $ret = factory.namedTable(alias,f,cdefs, tableSample);}
     |   LPAREN s=select_node RPAREN ((AS)? alias=identifier)? { $ret = 
factory.derivedTable(alias, s); }
     ;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index 49de75b..887e2d2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -50,6 +50,7 @@ import org.apache.phoenix.parse.BooleanParseNodeVisitor;
 import org.apache.phoenix.parse.ColumnDef;
 import org.apache.phoenix.parse.ColumnParseNode;
 import org.apache.phoenix.parse.ComparisonParseNode;
+import org.apache.phoenix.parse.ConcreteTableNode;
 import org.apache.phoenix.parse.DerivedTableNode;
 import org.apache.phoenix.parse.EqualParseNode;
 import org.apache.phoenix.parse.HintNode;
@@ -185,7 +186,7 @@ public class JoinCompiler {
         public Pair<Table, List<JoinSpec>> visit(BindTableNode boundTableNode) 
throws SQLException {
             TableRef tableRef = resolveTable(boundTableNode.getAlias(), 
boundTableNode.getName());
             List<AliasedNode> selectNodes = 
extractFromSelect(select.getSelect(), tableRef, origResolver);
-            Table table = new Table(boundTableNode, 
Collections.<ColumnDef>emptyList(), selectNodes, tableRef);
+            Table table = new Table(boundTableNode, 
Collections.<ColumnDef>emptyList(), boundTableNode.getTableSamplingRate(), 
selectNodes, tableRef);
             return new Pair<Table, List<JoinSpec>>(table, null);
         }
 
@@ -208,7 +209,7 @@ public class JoinCompiler {
                 throws SQLException {
             TableRef tableRef = resolveTable(namedTableNode.getAlias(), 
namedTableNode.getName());
             List<AliasedNode> selectNodes = 
extractFromSelect(select.getSelect(), tableRef, origResolver);
-            Table table = new Table(namedTableNode, 
namedTableNode.getDynamicColumns(), selectNodes, tableRef);
+            Table table = new Table(namedTableNode, 
namedTableNode.getDynamicColumns(), namedTableNode.getTableSamplingRate(), 
selectNodes, tableRef);
             return new Pair<Table, List<JoinSpec>>(table, null);
         }
 
@@ -614,6 +615,7 @@ public class JoinCompiler {
     public class Table {
         private final TableNode tableNode;
         private final List<ColumnDef> dynamicColumns;
+        private final Double tableSamplingRate;
         private final SelectStatement subselect;
         private final TableRef tableRef;
         private final List<AliasedNode> selectNodes; // all basic nodes 
related to this table, no aggregation.
@@ -621,10 +623,11 @@ public class JoinCompiler {
         private final List<ParseNode> postFilters;
         private final boolean isPostFilterConvertible;
 
-        private Table(TableNode tableNode, List<ColumnDef> dynamicColumns,
+        private Table(TableNode tableNode, List<ColumnDef> dynamicColumns, 
Double tableSamplingRate,
                 List<AliasedNode> selectNodes, TableRef tableRef) {
             this.tableNode = tableNode;
             this.dynamicColumns = dynamicColumns;
+            this.tableSamplingRate=tableSamplingRate;
             this.subselect = null;
             this.tableRef = tableRef;
             this.selectNodes = selectNodes;
@@ -637,6 +640,7 @@ public class JoinCompiler {
                 List<AliasedNode> selectNodes, TableRef tableRef) throws 
SQLException {
             this.tableNode = tableNode;
             this.dynamicColumns = Collections.<ColumnDef>emptyList();
+            
this.tableSamplingRate=ConcreteTableNode.DEFAULT_TABLE_SAMPLING_RATE;
             this.subselect = SubselectRewriter.flatten(tableNode.getSelect(), 
statement.getConnection());
             this.tableRef = tableRef;
             this.selectNodes = selectNodes;
@@ -652,6 +656,10 @@ public class JoinCompiler {
         public List<ColumnDef> getDynamicColumns() {
             return dynamicColumns;
         }
+        
+        public Double getTableSamplingRate() {
+            return tableSamplingRate;
+        }
 
         public boolean isSubselect() {
             return subselect != null;
@@ -1188,7 +1196,7 @@ public class JoinCompiler {
             TableRef tableRef = table.getTableRef();
             List<ParseNode> groupBy = tableRef.equals(groupByTableRef) ? 
select.getGroupBy() : null;
             List<OrderByNode> orderBy = tableRef.equals(orderByTableRef) ? 
select.getOrderBy() : null;
-            SelectStatement stmt = 
getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), 
tableRef, join.getColumnRefs(), table.getPreFiltersCombined(), groupBy, 
orderBy, table.isWildCardSelect(), select.hasSequence(), 
select.getUdfParseNodes());
+            SelectStatement stmt = 
getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), 
table.getTableSamplingRate(), tableRef, join.getColumnRefs(), 
table.getPreFiltersCombined(), groupBy, orderBy, table.isWildCardSelect(), 
select.hasSequence(), select.getUdfParseNodes());
             QueryPlan plan = 
statement.getConnection().getQueryServices().getOptimizer().optimize(statement, 
stmt);
             if (!plan.getTableRef().equals(tableRef)) {
                 replacement.put(tableRef, plan.getTableRef());
@@ -1244,7 +1252,7 @@ public class JoinCompiler {
                     return namedTableNode;
 
                 String alias = namedTableNode.getAlias();
-                return NODE_FACTORY.namedTable(alias == null ? null : '"' + 
alias + '"', getReplacedTableName(replaceRef), 
namedTableNode.getDynamicColumns());
+                return NODE_FACTORY.namedTable(alias == null ? null : '"' + 
alias + '"', getReplacedTableName(replaceRef), 
namedTableNode.getDynamicColumns(), namedTableNode.getTableSamplingRate());
             }
 
             @Override
@@ -1258,11 +1266,11 @@ public class JoinCompiler {
         for ( TableRef indexTableRef : replacement.values()) {
             // replace expressions with corresponding matching columns for 
functional indexes
             indexSelect = ParseNodeRewriter.rewrite(indexSelect, new  
IndexExpressionParseNodeRewriter(indexTableRef.getTable(), 
indexTableRef.getTableAlias(), statement.getConnection(), 
indexSelect.getUdfParseNodes()));
-        }
+        } 
         return indexSelect;
     }
 
-    private static SelectStatement getSubqueryForOptimizedPlan(HintNode 
hintNode, List<ColumnDef> dynamicCols, TableRef tableRef, Map<ColumnRef, 
ColumnRefType> columnRefs, ParseNode where, List<ParseNode> groupBy,
+    private static SelectStatement getSubqueryForOptimizedPlan(HintNode 
hintNode, List<ColumnDef> dynamicCols, Double tableSamplingRate, TableRef 
tableRef, Map<ColumnRef, ColumnRefType> columnRefs, ParseNode where, 
List<ParseNode> groupBy,
             List<OrderByNode> orderBy, boolean isWildCardSelect, boolean 
hasSequence, Map<String, UDFParseNode> udfParseNodes) {
         String schemaName = tableRef.getTable().getSchemaName().getString();
         TableName tName = TableName.create(schemaName.length() == 0 ? null : 
schemaName, tableRef.getTable().getTableName().getString());
@@ -1281,7 +1289,7 @@ public class JoinCompiler {
             }
         }
         String tableAlias = tableRef.getTableAlias();
-        TableNode from = NODE_FACTORY.namedTable(tableAlias == null ? null : 
'"' + tableAlias + '"', tName, dynamicCols);
+        TableNode from = NODE_FACTORY.namedTable(tableAlias == null ? null : 
'"' + tableAlias + '"', tName, dynamicCols,tableSamplingRate);
 
         return NODE_FACTORY.select(from, hintNode, false, selectList, where, 
groupBy, null, orderBy, null, null, 0,
                 groupBy != null, hasSequence, Collections.<SelectStatement> 
emptyList(), udfParseNodes);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 8d6c107..f1db46c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -113,6 +113,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Function;
+import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
@@ -782,9 +783,32 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         } finally {
             if (stream != null) Closeables.closeQuietly(stream);
         }
+        
+        
sampleScans(parallelScans,this.plan.getStatement().getTableSamplingRate());
         return parallelScans;
     }
 
+    /**
+     * Loop through List<List<Scan>> parallelScans object, 
+     * rolling dice on each scan based on startRowKey.
+     * 
+     * All FilterableStatement should have tableSamplingRate. 
+     * In case it is delete statement, an unsupported message is raised. 
+     * In case it is null tableSamplingRate, 100% sampling rate will be 
applied by default.
+     *  
+     * @param parallelScans
+     */
+    private void sampleScans(final List<List<Scan>> parallelScans, final 
Double tableSamplingRate){
+       if(tableSamplingRate==null||tableSamplingRate==100d) return;
+       final Predicate<byte[]> 
tableSamplerPredicate=TableSamplerPredicate.of(tableSamplingRate);
+       
+       for(Iterator<List<Scan>> is = parallelScans.iterator();is.hasNext();){
+               for(Iterator<Scan> i=is.next().iterator();i.hasNext();){
+                       final Scan scan=i.next();
+                       
if(!tableSamplerPredicate.apply(scan.getStartRow())){i.remove();}
+               }
+       }
+    }
    
     public static <T> List<T> reverseIfNecessary(List<T> list, boolean 
reverse) {
         if (!reverse) {
@@ -1070,6 +1094,10 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
             }
         }
         buf.append(getName()).append(" ").append(size()).append("-WAY ");
+        
+        if(this.plan.getStatement().getTableSamplingRate()!=null){
+               
buf.append(plan.getStatement().getTableSamplingRate()/100D).append("-").append("SAMPLED
 ");
+        }
         try {
             if (plan.useRoundRobinIterator()) {
                 buf.append("ROUND ROBIN ");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSamplerPredicate.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSamplerPredicate.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSamplerPredicate.java
new file mode 100644
index 0000000..7ee0b6b
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSamplerPredicate.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import com.google.common.base.Predicate;
+
+/**
+ * TableSampler.
+ * 
+ * A dice rolling on every targeted row to decide if this row is going 
+ * to be picked or not.
+ * An application is table Sampler, based on boolean result, this row is 
+ * then picked (or rejected) to be part of sample set.
+ * 
+ * Currently implemented using FNV1a with Lazy mod mapping method to ensure
+ * the even distribution of hashed result, so that the final sampled result 
+ * will be close to the size of expected
+ * 
+ */
+public class TableSamplerPredicate implements Predicate<byte[]>{
+       private final double tableSamplingRate;
+       
+       private TableSamplerPredicate(double tableSamplingRate){
+               this.tableSamplingRate=tableSamplingRate;
+       }
+               
+       public static TableSamplerPredicate of(final Double 
tableSamplingRateRaw){
+               assert(tableSamplingRateRaw!=null):"tableSamplingRate can not 
be null";
+               
assert(tableSamplingRateRaw>=0d&&tableSamplingRateRaw<=100d):"tableSamplingRate 
input has to be a rational number between 0 and 100";
+               TableSamplerPredicate self=new 
TableSamplerPredicate(tableSamplingRateRaw);
+               return self;
+       }       
+       
+       @Override
+       public boolean apply(byte[] bytes) {
+               final int hashcode_FNV1Lazy=FNV1LazyImpl(bytes);
+               final boolean result=evaluateWithChance(hashcode_FNV1Lazy);
+       return result;
+       }
+       
+       /**
+        * Take build in FNV1a Hash function then apply lazy mod mapping method 
so that the 
+        * hash is evenly distributed between 0 and 100.
+        * 
+        * Quoted from http://isthe.com/chongo/tech/comp/fnv/, 
+        * The FNV hash is designed for hash sizes that are a power of 2. 
+        * If you need a hash size that is not a power of two, then you have 
two choices. 
+        * One method is called the lazy mod mapping method and the other is 
called the retry method. 
+        * Both involve mapping a range that is a power of 2 onto an arbitrary 
range.
+        * 
+        * Lazy mod mapping method: The lazy mod mapping method uses a simple 
mod on an n-bit hash 
+        * to yield an arbitrary range. 
+        * To produce a hash range between 0 and X use a n-bit FNV hash where n 
is smallest FNV hash 
+        * that will produce values larger than X without the need for 
xor-folding.
+        * 
+        * For example, to produce a value between 0 and 2142779559 using the 
lazy mod mapping method, 
+        * we select a 32-bit FNV hash because: 2 power 32 > 49999
+        * Before the final mod 50000 is performed, 
+        * we check to see if the 32-bit FNV hash value is one of the upper 
biased values. 
+        * If it is, we perform additional loop cycles until is below the bias 
level.
+        * 
+        * An advantage of the lazy mod mapping method is that it requires only 
1 more operation: 
+        * only an additional mod is performed at the end.
+        * The disadvantage of the lazy mod mapping method is that there is a 
bias against 
+        * the larger values.
+        * 
+        * @param bytes
+        * @return
+        */
+       final static private int FNV1LazyImpl(final byte[] bytes){
+               final int contentBasedHashCode = 
java.util.Arrays.hashCode(bytes);
+               return lazyRedistribute(contentBasedHashCode);
+       }
+       
+       
+       /**
+        * Lazy mod mapping method Implementation
+        * 
+        * Output result should be following the same distribution as input 
hashcode, 
+        * however re-mapped between 0 and 100.
+        * 
+        * @param hashcode
+        * @return 
+        */
+       final static private int lazyRedistribute(final int hashcode){
+               return java.lang.Math.abs(hashcode%100);
+       }
+       
+       /**
+        * 
+        * @param hashcode
+        * @return
+        */
+    final private boolean evaluateWithChance(final int hashcode){
+       assert((hashcode>=0)&&(hashcode<=100)):"hashcode should be 
re-distribute into 0 to 100";
+       return (hashcode<tableSamplingRate)?true:false;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 14b7945..2bfc5fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -221,7 +221,7 @@ public class QueryOptimizer {
         schemaName = schemaName.length() == 0 ? null :  '"' + schemaName + '"';
 
         String tableName = '"' + index.getTableName().getString() + '"';
-        TableNode table = FACTORY.namedTable(alias, FACTORY.table(schemaName, 
tableName));
+        TableNode table = FACTORY.namedTable(alias, FACTORY.table(schemaName, 
tableName),select.getTableSamplingRate());
         SelectStatement indexSelect = FACTORY.select(select, table);
         ColumnResolver resolver = 
FromCompiler.getResolverForQuery(indexSelect, statement.getConnection());
         // We will or will not do tuple projection according to the data plan.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/parse/ConcreteTableNode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/ConcreteTableNode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/ConcreteTableNode.java
index 640ee7b..6a4ed6f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ConcreteTableNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ConcreteTableNode.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.parse;
 
+import org.apache.commons.math3.exception.OutOfRangeException;
 import org.apache.phoenix.util.SchemaUtil;
 
 /**
@@ -27,22 +28,41 @@ import org.apache.phoenix.util.SchemaUtil;
  * @since 0.1
  */
 public abstract class ConcreteTableNode extends TableNode {
+       //DEFAULT_TABLE_SAMPLING_RATE alternative is to set as 100d
+       public static final Double DEFAULT_TABLE_SAMPLING_RATE=null;
     private final TableName name;
+    private final Double tableSamplingRate;
     
     ConcreteTableNode(String alias, TableName name) {
+       this(alias,name,DEFAULT_TABLE_SAMPLING_RATE);
+    }
+    
+    ConcreteTableNode(String alias, TableName name, Double tableSamplingRate) {
         super(SchemaUtil.normalizeIdentifier(alias));
         this.name = name;
+        if(tableSamplingRate==null){
+               this.tableSamplingRate=DEFAULT_TABLE_SAMPLING_RATE;
+        }else if(tableSamplingRate<0d||tableSamplingRate>100d){
+               throw new OutOfRangeException(tableSamplingRate, 0d, 100d);
+        }else{
+               this.tableSamplingRate=tableSamplingRate;
+        }
     }
 
     public TableName getName() {
         return name;
     }
+    
+    public Double getTableSamplingRate(){
+       return tableSamplingRate;
+    }
 
     @Override
     public int hashCode() {
         final int prime = 31;
         int result = 1;
         result = prime * result + ((name == null) ? 0 : name.hashCode());
+        result = prime * result + ((tableSamplingRate == null) ? 0 : 
tableSamplingRate.hashCode());
         return result;
     }
 
@@ -55,6 +75,9 @@ public abstract class ConcreteTableNode extends TableNode {
         if (name == null) {
             if (other.name != null) return false;
         } else if (!name.equals(other.name)) return false;
+        if (tableSamplingRate == null) {
+            if (other.tableSamplingRate != null) return false;
+        } else if (!tableSamplingRate.equals(other.tableSamplingRate)) return 
false;
         return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
index fb2d327..331bee4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
@@ -76,5 +76,10 @@ public class DeleteStatement extends DMLStatement implements 
FilterableStatement
     public OffsetNode getOffset() {
         return null;
     }
+    
+    @Override
+    public Double getTableSamplingRate(){
+       throw new UnsupportedOperationException("Table sampling is not allowd 
for Deletion");
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/parse/FilterableStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/FilterableStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/FilterableStatement.java
index ad54d98..62a4aa2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/FilterableStatement.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/FilterableStatement.java
@@ -25,6 +25,7 @@ public interface FilterableStatement extends 
BindableStatement {
     public boolean isDistinct();
     public boolean isAggregate();
     public List<OrderByNode> getOrderBy();
+    public Double getTableSamplingRate();
     public LimitNode getLimit();
     public OffsetNode getOffset();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java
index d3b4505..7944a04 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java
@@ -34,7 +34,7 @@ import com.google.common.collect.ImmutableList;
 public class NamedTableNode extends ConcreteTableNode {
 
     private final List<ColumnDef> dynColumns;
-
+    
     public static NamedTableNode create (String alias, TableName name, 
List<ColumnDef> dynColumns) {
         return new NamedTableNode(alias, name, dynColumns);
     }
@@ -47,13 +47,23 @@ public class NamedTableNode extends ConcreteTableNode {
         return new NamedTableNode(null, TableName.create(schemaName, 
tableName), Collections.<ColumnDef>emptyList());
     }
     
+    @Deprecated
     NamedTableNode(String alias, TableName name) {
-        super(alias, name);
-        dynColumns = Collections.<ColumnDef> emptyList();
+        this(alias, name, ConcreteTableNode.DEFAULT_TABLE_SAMPLING_RATE);
     }
-
+    
+    @Deprecated
     NamedTableNode(String alias, TableName name, List<ColumnDef> dynColumns) {
-        super(alias, name);
+       
this(alias,name,dynColumns,ConcreteTableNode.DEFAULT_TABLE_SAMPLING_RATE);
+    }
+    
+    NamedTableNode(String alias, TableName name, Double tableSamplingRate) {
+       super(alias, name, tableSamplingRate);
+        dynColumns = Collections.<ColumnDef> emptyList();
+    }
+    
+    NamedTableNode(String alias, TableName name, List<ColumnDef> dynColumns, 
Double tableSamplingRate) {
+        super(alias, name, tableSamplingRate);
         if (dynColumns != null) {
             this.dynColumns = ImmutableList.copyOf(dynColumns);
         } else {
@@ -83,6 +93,7 @@ public class NamedTableNode extends ConcreteTableNode {
             buf.append(')');
         }
         if (this.getAlias() != null) buf.append(" " + this.getAlias());
+        if (this.getTableSamplingRate() != null) buf.append(" TABLESAMPLE " + 
this.getTableSamplingRate());
         buf.append(' ');
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 4628d51..0058f38 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -398,13 +398,35 @@ public class ParseNodeFactory {
         return new NamedNode(name);
     }
 
+    @Deprecated
     public NamedTableNode namedTable(String alias, TableName name) {
         return new NamedTableNode(alias, name);
     }
-
-    public NamedTableNode namedTable(String alias, TableName name 
,List<ColumnDef> dyn_columns) {
+    
+    @Deprecated
+    public NamedTableNode namedTable(String alias, TableName name, 
List<ColumnDef> dyn_columns) {
         return new NamedTableNode(alias, name,dyn_columns);
     }
+    
+    public NamedTableNode namedTable(String alias, TableName name, Double 
tableSamplingRate) {
+        return new NamedTableNode(alias, name, tableSamplingRate);
+    }
+    
+    public NamedTableNode namedTable(String alias, TableName name, 
List<ColumnDef> dyn_columns, Double tableSamplingRate) {
+       return new NamedTableNode(alias, name,dyn_columns, tableSamplingRate);
+    }
+    
+    public NamedTableNode namedTable(String alias, TableName name, 
List<ColumnDef> dyn_columns, LiteralParseNode tableSampleNode) {
+       Double tableSamplingRate;
+       if(tableSampleNode==null||tableSampleNode.getValue()==null){
+               tableSamplingRate=ConcreteTableNode.DEFAULT_TABLE_SAMPLING_RATE;
+       }else if(tableSampleNode.getValue() instanceof Integer){
+               tableSamplingRate=(double)((int)tableSampleNode.getValue());
+       }else{
+               tableSamplingRate=((BigDecimal) 
tableSampleNode.getValue()).doubleValue();
+       }
+       return new NamedTableNode(alias, name, dyn_columns, tableSamplingRate);
+    }
 
     public BindTableNode bindTable(String alias, TableName name) {
         return new BindTableNode(alias, name);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5e33dc12/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
index a7c1a0b..d4f079b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
@@ -266,6 +266,21 @@ public class SelectStatement implements 
FilterableStatement {
         return limit;
     }
     
+    /**
+     *  This method should not be called during the early stage 
+     *  of the plan preparation phase since fromTable might not 
+     *  be ConcreteTableNode at that time(e.g., JoinTableNode).
+     *  
+     *  By the time getTableSamplingRate method is called, 
+     *  each select statements should have exactly one ConcreteTableNode,
+     *  with its corresponding sampling rate associate with it.
+     */
+    @Override
+    public Double getTableSamplingRate(){
+       if(fromTable==null || !(fromTable instanceof ConcreteTableNode)) return 
null;
+       return ((ConcreteTableNode)fromTable).getTableSamplingRate();
+    }
+    
     @Override
     public int getBindCount() {
         return bindCount;

Reply via email to