Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 1e83415f1 -> 9bb7811f0


PHOENIX-4288 Indexes not used when ordering by primary key


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/541d6ac2
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/541d6ac2
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/541d6ac2

Branch: refs/heads/4.x-HBase-0.98
Commit: 541d6ac22866fe7571365e063a23108c6ca1ea63
Parents: 1e83415
Author: maryannxue <maryann....@gmail.com>
Authored: Tue Dec 5 10:52:46 2017 -0800
Committer: maryannxue <maryann....@gmail.com>
Committed: Tue Mar 13 17:16:04 2018 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/CostBasedDecisionIT.java    | 466 +++++++++++++++++++
 .../phoenix/compile/ListJarsQueryPlan.java      |   6 +
 .../org/apache/phoenix/compile/QueryPlan.java   |   5 +-
 .../apache/phoenix/compile/TraceQueryPlan.java  |   6 +
 .../apache/phoenix/execute/AggregatePlan.java   |  30 +-
 .../apache/phoenix/execute/BaseQueryPlan.java   |  19 +-
 .../phoenix/execute/ClientAggregatePlan.java    |  28 ++
 .../apache/phoenix/execute/ClientScanPlan.java  |  25 +
 .../apache/phoenix/execute/CorrelatePlan.java   |  25 +
 .../phoenix/execute/DelegateQueryPlan.java      |   6 +
 .../apache/phoenix/execute/HashJoinPlan.java    |  29 ++
 .../execute/LiteralResultIterationPlan.java     |   6 +
 .../org/apache/phoenix/execute/ScanPlan.java    |  25 +
 .../phoenix/execute/SortMergeJoinPlan.java      |  18 +
 .../org/apache/phoenix/execute/UnionPlan.java   |  10 +
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   6 +
 .../java/org/apache/phoenix/optimize/Cost.java  | 123 +++++
 .../apache/phoenix/optimize/QueryOptimizer.java |  28 +-
 .../org/apache/phoenix/query/QueryServices.java |   2 +
 .../phoenix/query/QueryServicesOptions.java     |   6 +-
 .../java/org/apache/phoenix/util/CostUtil.java  |  90 ++++
 .../query/ParallelIteratorsSplitTest.java       |   6 +
 22 files changed, 951 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
new file mode 100644
index 0000000..a3584ce
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CostBasedDecisionIT.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class CostBasedDecisionIT extends BaseUniqueNamesOwnClusterIT {
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, 
Long.toString(20));
+        props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
+        props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, 
Boolean.toString(true));
+        props.put(QueryServices.COST_BASED_OPTIMIZER_ENABLED, 
Boolean.toString(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrdering1() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " 
(\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 VARCHAR,\n" +
+                    "c2 VARCHAR)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + 
"_idx ON " + tableName + " (c1)");
+
+            String query = "SELECT rowkey, c1, c2 FROM " + tableName + " where 
c1 LIKE 'X0%' ORDER BY rowkey";
+            // Use the data table plan that opts out order-by when stats are 
not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + 
query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".",
+                    plan.contains("FULL SCAN"));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                int c1 = i % 16;
+                stmt.setString(1, "k" + i);
+                stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+                stmt.setString(3, "c");
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the index table plan that has a lower cost when stats 
become available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".",
+                    plan.contains("RANGE SCAN"));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrdering2() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " 
(\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 VARCHAR,\n" +
+                    "c2 VARCHAR)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + 
"_idx ON " + tableName + " (c1)");
+
+            String query = "SELECT rowkey, max(c1), max(c2) FROM " + tableName 
+ " where c1 LIKE 'X%' GROUP BY rowkey";
+            // Use the index table plan that opts out order-by when stats are 
not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + 
query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected 'RANGE SCAN' in the plan:\n" + plan + ".",
+                    plan.contains("RANGE SCAN"));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                int c1 = i % 16;
+                stmt.setString(1, "k" + i);
+                stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+                stmt.setString(3, "c");
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Given that the range on C1 is meaningless and group-by becomes
+            // order-preserving if using the data table, the data table plan 
should
+            // come out as the best plan based on the costs.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected 'FULL SCAN' in the plan:\n" + plan + ".",
+                    plan.contains("FULL SCAN"));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrdering3() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " 
(\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 INTEGER,\n" +
+                    "c2 INTEGER,\n" +
+                    "c3 INTEGER)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + 
"_idx1 ON " + tableName + " (c1) INCLUDE (c2, c3)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + 
"_idx2 ON " + tableName + " (c2, c3) INCLUDE (c1)");
+
+            String query = "SELECT * FROM " + tableName + " where c1 BETWEEN 
10 AND 20 AND c2 < 9000 AND C3 < 5000";
+            // Use the idx2 plan with a wider PK slot span when stats are not 
available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + 
query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            String indexPlan =
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " 
[2,*] - [2,9,000]\n" +
+                    "    SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 20) AND 
TO_INTEGER(\"C3\") < 5000)\n" +
+                    "CLIENT MERGE SORT";
+            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + 
".",
+                    plan.contains(indexPlan));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                stmt.setString(1, "k" + i);
+                stmt.setInt(2, i);
+                stmt.setInt(3, i);
+                stmt.setInt(4, i);
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the idx2 plan that scans less data when stats become 
available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            String dataPlan =
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " 
[1,10] - [1,20]\n" +
+                    "    SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 5000)\n" 
+
+                    "CLIENT MERGE SORT";
+            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + 
".",
+                    plan.contains(dataPlan));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrderingInUpsertQuery() throws 
Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " 
(\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 INTEGER,\n" +
+                    "c2 INTEGER,\n" +
+                    "c3 INTEGER)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + 
"_idx1 ON " + tableName + " (c1) INCLUDE (c2, c3)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + 
"_idx2 ON " + tableName + " (c2, c3) INCLUDE (c1)");
+
+            String query = "UPSERT INTO " + tableName + " SELECT * FROM " + 
tableName + " where c1 BETWEEN 10 AND 20 AND c2 < 9000 AND C3 < 5000";
+            // Use the idx2 plan with a wider PK slot span when stats are not 
available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + 
query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            String indexPlan =
+                    "UPSERT SELECT\n" +
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " 
[2,*] - [2,9,000]\n" +
+                            "    SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 
20) AND TO_INTEGER(\"C3\") < 5000)\n" +
+                            "CLIENT MERGE SORT";
+            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + 
".",
+                    plan.contains(indexPlan));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                stmt.setString(1, "k" + i);
+                stmt.setInt(2, i);
+                stmt.setInt(3, i);
+                stmt.setInt(4, i);
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the idx2 plan that scans less data when stats become 
available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            String dataPlan =
+                    "UPSERT SELECT\n" +
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " 
[1,10] - [1,20]\n" +
+                            "    SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 
5000)\n" +
+                            "CLIENT MERGE SORT";
+            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + 
".",
+                    plan.contains(dataPlan));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrderingInDeleteQuery() throws 
Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " 
(\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 INTEGER,\n" +
+                    "c2 INTEGER,\n" +
+                    "c3 INTEGER)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + 
"_idx1 ON " + tableName + " (c1) INCLUDE (c2, c3)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + 
"_idx2 ON " + tableName + " (c2, c3) INCLUDE (c1)");
+
+            String query = "DELETE FROM " + tableName + " where c1 BETWEEN 10 
AND 20 AND c2 < 9000 AND C3 < 5000";
+            // Use the idx2 plan with a wider PK slot span when stats are not 
available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + 
query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            String indexPlan =
+                    "DELETE ROWS\n" +
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " 
[2,*] - [2,9,000]\n" +
+                            "    SERVER FILTER BY ((\"C1\" >= 10 AND \"C1\" <= 
20) AND TO_INTEGER(\"C3\") < 5000)\n" +
+                            "CLIENT MERGE SORT";
+            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + 
".",
+                    plan.contains(indexPlan));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " (rowkey, c1, c2, c3) VALUES (?, ?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                stmt.setString(1, "k" + i);
+                stmt.setInt(2, i);
+                stmt.setInt(3, i);
+                stmt.setInt(4, i);
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the idx2 plan that scans less data when stats become 
available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            String dataPlan =
+                    "DELETE ROWS\n" +
+                    "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + " 
[1,10] - [1,20]\n" +
+                            "    SERVER FILTER BY (\"C2\" < 9000 AND \"C3\" < 
5000)\n" +
+                            "CLIENT MERGE SORT";
+            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + 
".",
+                    plan.contains(dataPlan));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrderingInUnionQuery() throws 
Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " 
(\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 VARCHAR,\n" +
+                    "c2 VARCHAR)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + 
"_idx ON " + tableName + " (c1)");
+
+            String query = "SELECT c1, max(rowkey), max(c2) FROM " + tableName 
+ " where rowkey LIKE 'k%' GROUP BY c1 "
+                    + "UNION ALL SELECT rowkey, max(c1), max(c2) FROM " + 
tableName + " where c1 LIKE 'X%' GROUP BY rowkey";
+            // Use the default plan when stats are not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + 
query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            String defaultPlan =
+                    "UNION ALL OVER 2 QUERIES\n" +
+                    "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + 
" ['k'] - ['l']\n" +
+                    "        SERVER AGGREGATE INTO DISTINCT ROWS BY [C1]\n" +
+                    "    CLIENT MERGE SORT\n" +
+                    "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + 
" [1,'X'] - [1,'Y']\n" +
+                    "        SERVER FILTER BY FIRST KEY ONLY\n" +
+                    "        SERVER AGGREGATE INTO DISTINCT ROWS BY 
[\"ROWKEY\"]\n" +
+                    "    CLIENT MERGE SORT";
+            assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan 
+ ".",
+                    plan.contains(defaultPlan));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                int c1 = i % 16;
+                stmt.setString(1, "k" + i);
+                stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+                stmt.setString(3, "c");
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the optimal plan based on cost when stats become available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            String optimizedPlan =
+                    "UNION ALL OVER 2 QUERIES\n" +
+                    "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + tableName + 
" [1]\n" +
+                    "        SERVER FILTER BY FIRST KEY ONLY AND \"ROWKEY\" 
LIKE 'k%'\n" +
+                    "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY 
[\"C1\"]\n" +
+                    "    CLIENT MERGE SORT\n" +
+                    "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + 
"\n" +
+                    "        SERVER FILTER BY C1 LIKE 'X%'\n" +
+                    "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY 
[ROWKEY]";
+            assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + 
plan + ".",
+                    plan.contains(optimizedPlan));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testCostOverridesStaticPlanOrderingInJoinQuery() throws 
Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " 
(\n" +
+                    "rowkey VARCHAR PRIMARY KEY,\n" +
+                    "c1 VARCHAR,\n" +
+                    "c2 VARCHAR)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + 
"_idx ON " + tableName + " (c1)");
+
+            String query = "SELECT t1.rowkey, t1.c1, t1.c2, mc1, mc2 FROM " + 
tableName + " t1 "
+                    + "JOIN (SELECT rowkey, max(c1) mc1, max(c2) mc2 FROM " + 
tableName + " where c1 LIKE 'X%' GROUP BY rowkey) t2 "
+                    + "ON t1.rowkey = t2.rowkey WHERE t1.c1 LIKE 'X0%' ORDER 
BY t1.rowkey";
+            // Use the default plan when stats are not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + 
query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            String defaultPlan =
+                    "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + tableName + "\n" 
+
+                    "    SERVER FILTER BY C1 LIKE 'X0%'\n" +
+                    "    PARALLEL INNER-JOIN TABLE 0\n" +
+                    "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + 
tableName + " [1,'X'] - [1,'Y']\n" +
+                    "            SERVER FILTER BY FIRST KEY ONLY\n" +
+                    "            SERVER AGGREGATE INTO DISTINCT ROWS BY 
[\"ROWKEY\"]\n" +
+                    "        CLIENT MERGE SORT\n" +
+                    "    DYNAMIC SERVER FILTER BY T1.ROWKEY IN (T2.ROWKEY)";
+            assertTrue("Expected '" + defaultPlan + "' in the plan:\n" + plan 
+ ".",
+                    plan.contains(defaultPlan));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                int c1 = i % 16;
+                stmt.setString(1, "k" + i);
+                stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+                stmt.setString(3, "c");
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the optimal plan based on cost when stats become available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            String optimizedPlan =
+                    "CLIENT PARALLEL 626-WAY RANGE SCAN OVER " + tableName + " 
[1,'X0'] - [1,'X1']\n" +
+                    "    SERVER FILTER BY FIRST KEY ONLY\n" +
+                    "    SERVER SORTED BY [\"T1.:ROWKEY\"]\n" +
+                    "CLIENT MERGE SORT\n" +
+                    "    PARALLEL INNER-JOIN TABLE 0\n" +
+                    "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + 
tableName + "\n" +
+                    "            SERVER FILTER BY C1 LIKE 'X%'\n" +
+                    "            SERVER AGGREGATE INTO ORDERED DISTINCT ROWS 
BY [ROWKEY]\n" +
+                    "    DYNAMIC SERVER FILTER BY \"T1.:ROWKEY\" IN 
(T2.ROWKEY)";
+            assertTrue("Expected '" + optimizedPlan + "' in the plan:\n" + 
plan + ".",
+                    plan.contains(optimizedPlan));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testHintOverridesCost() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        try {
+            String tableName = BaseTest.generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + tableName + " 
(\n" +
+                    "rowkey INTEGER PRIMARY KEY,\n" +
+                    "c1 VARCHAR,\n" +
+                    "c2 VARCHAR)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + tableName + 
"_idx ON " + tableName + " (c1)");
+
+            String query = "SELECT rowkey, c1, c2 FROM " + tableName + " where 
rowkey between 1 and 10 ORDER BY c1";
+            String hintedQuery = query.replaceFirst("SELECT",
+                    "SELECT  /*+ INDEX(" + tableName + " " + tableName + 
"_idx) */");
+            String dataPlan = "SERVER SORTED BY [C1]";
+            String indexPlan = "SERVER FILTER BY FIRST KEY ONLY AND 
(\"ROWKEY\" >= 1 AND \"ROWKEY\" <= 10)";
+
+            // Use the index table plan that opts out order-by when stats are 
not available.
+            ResultSet rs = conn.createStatement().executeQuery("explain " + 
query);
+            String plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + 
".",
+                    plan.contains(indexPlan));
+
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " (rowkey, c1, c2) VALUES (?, ?, ?)");
+            for (int i = 0; i < 10000; i++) {
+                int c1 = i % 16;
+                stmt.setInt(1, i);
+                stmt.setString(2, "X" + Integer.toHexString(c1) + c1);
+                stmt.setString(3, "c");
+                stmt.execute();
+            }
+
+            conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+
+            // Use the data table plan that has a lower cost when stats are 
available.
+            rs = conn.createStatement().executeQuery("explain " + query);
+            plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected '" + dataPlan + "' in the plan:\n" + plan + 
".",
+                    plan.contains(dataPlan));
+
+            // Use the index table plan as has been hinted.
+            rs = conn.createStatement().executeQuery("explain " + hintedQuery);
+            plan = QueryUtil.getExplainPlan(rs);
+            assertTrue("Expected '" + indexPlan + "' in the plan:\n" + plan + 
".",
+                    plan.contains(indexPlan));
+        } finally {
+            conn.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index 1888114..fa48e52 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -49,6 +49,7 @@ import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.LiteralParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
@@ -186,6 +187,11 @@ public class ListJarsQueryPlan implements QueryPlan {
     }
 
     @Override
+    public Cost getCost() {
+        return Cost.ZERO;
+    }
+
+    @Override
     public TableRef getTableRef() {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index f7cdcbf..ca88984 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -26,6 +26,7 @@ import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.TableRef;
@@ -52,7 +53,9 @@ public interface QueryPlan extends StatementPlan {
     public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) 
throws SQLException;
 
     public long getEstimatedSize();
-    
+
+    public Cost getCost();
+
     // TODO: change once joins are supported
     TableRef getTableRef();
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 2eab965..5cd1d08 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -44,6 +44,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
 import org.apache.phoenix.metrics.MetricInfo;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.LiteralParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
@@ -200,6 +201,11 @@ public class TraceQueryPlan implements QueryPlan {
     }
 
     @Override
+    public Cost getCost() {
+        return Cost.ZERO;
+    }
+
+    @Override
     public Set<TableRef> getSourceRefs() {
         return Collections.emptySet();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 37e0c5a..2e042e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -59,6 +59,7 @@ import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.SerialIterators;
 import org.apache.phoenix.iterate.SpoolingResultIterator;
 import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.KeyRange;
@@ -67,6 +68,7 @@ import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.CostUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -112,7 +114,33 @@ public class AggregatePlan extends BaseQueryPlan {
     public Expression getHaving() {
         return having;
     }
-    
+
+    @Override
+    public Cost getCost() {
+        Long byteCount = null;
+        try {
+            byteCount = getEstimatedBytesToScan();
+        } catch (SQLException e) {
+            // ignored.
+        }
+
+        if (byteCount == null) {
+            return Cost.UNKNOWN;
+        }
+
+        int parallelLevel = CostUtil.estimateParallelLevel(
+                true, context.getConnection().getQueryServices());
+        Cost cost = CostUtil.estimateAggregateCost(byteCount,
+                groupBy, aggregators.getEstimatedByteSize(), parallelLevel);
+        if (!orderBy.getOrderByExpressions().isEmpty()) {
+            double outputBytes = CostUtil.estimateAggregateOutputBytes(
+                    byteCount, groupBy, aggregators.getEstimatedByteSize());
+            Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, 
parallelLevel);
+            cost = cost.plus(orderByCost);
+        }
+        return cost;
+    }
+
     @Override
     public List<KeyRange> getSplits() {
         if (splits == null)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index df55e63..38ed926 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -504,13 +504,24 @@ public abstract class BaseQueryPlan implements QueryPlan {
         if (context.getScanRanges() == ScanRanges.NOTHING) {
             return new ExplainPlan(Collections.singletonList("DEGENERATE SCAN 
OVER " + getTableRef().getTable().getName().getString()));
         }
-        
+
+        // If cost-based optimizer is enabled, we need to initialize a dummy 
iterator to
+        // get the stats for computing costs.
+        boolean costBased =
+                
context.getConnection().getQueryServices().getConfiguration().getBoolean(
+                        QueryServices.COST_BASED_OPTIMIZER_ENABLED, 
QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
+        if (costBased) {
+            ResultIterator iterator = iterator();
+            iterator.close();
+        }
         // Optimize here when getting explain plan, as queries don't get 
optimized until after compilation
         QueryPlan plan = 
context.getConnection().getQueryServices().getOptimizer().optimize(context.getStatement(),
 this);
         ExplainPlan exp = plan instanceof BaseQueryPlan ? new 
ExplainPlan(getPlanSteps(plan.iterator())) : plan.getExplainPlan();
-        this.estimatedRows = plan.getEstimatedRowsToScan();
-        this.estimatedSize = plan.getEstimatedBytesToScan();
-        this.estimateInfoTimestamp = plan.getEstimateInfoTimestamp();
+        if (!costBased) { // do not override estimates if they are used for 
cost calculation.
+            this.estimatedRows = plan.getEstimatedRowsToScan();
+            this.estimatedSize = plan.getEstimatedBytesToScan();
+            this.estimateInfoTimestamp = plan.getEstimateInfoTimestamp();
+        }
         return exp;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index 8ef1f8d..a15ab35 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -56,12 +56,14 @@ import org.apache.phoenix.iterate.PeekingResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.CostUtil;
 import org.apache.phoenix.util.TupleUtil;
 
 import com.google.common.collect.Lists;
@@ -87,6 +89,32 @@ public class ClientAggregatePlan extends 
ClientProcessingPlan {
     }
 
     @Override
+    public Cost getCost() {
+        Long byteCount = null;
+        try {
+            byteCount = getEstimatedBytesToScan();
+        } catch (SQLException e) {
+            // ignored.
+        }
+
+        if (byteCount == null) {
+            return Cost.UNKNOWN;
+        }
+
+        int parallelLevel = CostUtil.estimateParallelLevel(
+                false, context.getConnection().getQueryServices());
+        Cost cost = CostUtil.estimateAggregateCost(byteCount,
+                groupBy, clientAggregators.getEstimatedByteSize(), 
parallelLevel);
+        if (!orderBy.getOrderByExpressions().isEmpty()) {
+            double outputBytes = CostUtil.estimateAggregateOutputBytes(
+                    byteCount, groupBy, 
clientAggregators.getEstimatedByteSize());
+            Cost orderByCost = CostUtil.estimateOrderByCost(outputBytes, 
parallelLevel);
+            cost = cost.plus(orderByCost);
+        }
+        return super.getCost().plus(cost);
+    }
+
+    @Override
     public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) 
throws SQLException {
         ResultIterator iterator = delegate.iterator(scanGrouper, scan);
         if (where != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
index 6bbc545..5799990 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -34,10 +34,12 @@ import org.apache.phoenix.iterate.OrderedResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.CostUtil;
 
 import com.google.common.collect.Lists;
 
@@ -50,6 +52,29 @@ public class ClientScanPlan extends ClientProcessingPlan {
     }
 
     @Override
+    public Cost getCost() {
+        Long byteCount = null;
+        try {
+            byteCount = getEstimatedBytesToScan();
+        } catch (SQLException e) {
+            // ignored.
+        }
+
+        if (byteCount == null) {
+            return Cost.UNKNOWN;
+        }
+
+        Cost cost = new Cost(0, 0, byteCount);
+        int parallelLevel = CostUtil.estimateParallelLevel(
+                false, context.getConnection().getQueryServices());
+        if (!orderBy.getOrderByExpressions().isEmpty()) {
+            Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, 
parallelLevel);
+            cost = cost.plus(orderByCost);
+        }
+        return super.getCost().plus(cost);
+    }
+
+    @Override
     public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) 
throws SQLException {
         ResultIterator iterator = delegate.iterator(scanGrouper, scan);
         if (where != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
index ee81c36..270ad3d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.PColumn;
@@ -200,4 +201,28 @@ public class CorrelatePlan extends DelegateQueryPlan {
         return null;
     }
 
+    @Override
+    public Cost getCost() {
+        Long lhsByteCount = null;
+        try {
+            lhsByteCount = delegate.getEstimatedBytesToScan();
+        } catch (SQLException e) {
+            // ignored.
+        }
+        Long rhsRowCount = null;
+        try {
+            rhsRowCount = rhs.getEstimatedRowsToScan();
+        } catch (SQLException e) {
+            // ignored.
+        }
+
+        if (lhsByteCount == null || rhsRowCount == null) {
+            return Cost.UNKNOWN;
+        }
+
+        Cost cost = new Cost(0, 0, lhsByteCount * rhsRowCount);
+        Cost lhsCost = delegate.getCost();
+        return cost.plus(lhsCost).plus(rhs.getCost());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
index 3c62c5b..3da06db 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.TableRef;
@@ -59,6 +60,11 @@ public abstract class DelegateQueryPlan implements QueryPlan 
{
     }
 
     @Override
+    public Cost getCost() {
+        return delegate.getCost();
+    }
+
+    @Override
     public TableRef getTableRef() {
         return delegate.getTableRef();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 2b90dcb..2d2ff4e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -62,6 +62,7 @@ import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SQLParser;
@@ -290,6 +291,34 @@ public class HashJoinPlan extends DelegateQueryPlan {
         return statement;
     }
 
+    @Override
+    public Cost getCost() {
+        Long byteCount = null;
+        try {
+            byteCount = getEstimatedBytesToScan();
+        } catch (SQLException e) {
+            // ignored.
+        }
+
+        if (byteCount == null) {
+            return Cost.UNKNOWN;
+        }
+
+        Cost cost = new Cost(0, 0, byteCount);
+        Cost lhsCost = delegate.getCost();
+        if (keyRangeExpressions != null) {
+            // The selectivity of the dynamic rowkey filter.
+            // TODO replace the constant with an estimate value.
+            double selectivity = 0.01;
+            lhsCost = lhsCost.multiplyBy(selectivity);
+        }
+        Cost rhsCost = Cost.ZERO;
+        for (SubPlan subPlan : subPlans) {
+            rhsCost = rhsCost.plus(subPlan.getInnerPlan().getCost());
+        }
+        return cost.plus(lhsCost).plus(rhsCost);
+    }
+
     protected interface SubPlan {
         public ServerCache execute(HashJoinPlan parent) throws SQLException;
         public void postProcess(ServerCache result, HashJoinPlan parent) 
throws SQLException;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index 4470947..c9abb69 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -35,6 +35,7 @@ import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.TableRef;
@@ -60,6 +61,11 @@ public class LiteralResultIterationPlan extends 
BaseQueryPlan {
     }
 
     @Override
+    public Cost getCost() {
+        return Cost.ZERO;
+    }
+
+    @Override
     public List<KeyRange> getSplits() {
         return Collections.emptyList();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index af25bff..d63950c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -54,6 +54,7 @@ import org.apache.phoenix.iterate.RoundRobinResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.SerialIterators;
 import org.apache.phoenix.iterate.SpoolingResultIterator;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.ConnectionQueryServices;
@@ -65,6 +66,7 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.CostUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
@@ -193,6 +195,29 @@ public class ScanPlan extends BaseQueryPlan {
     }
 
     @Override
+    public Cost getCost() {
+        Long byteCount = null;
+        try {
+            byteCount = getEstimatedBytesToScan();
+        } catch (SQLException e) {
+            // ignored.
+        }
+
+        if (byteCount == null) {
+            return Cost.UNKNOWN;
+        }
+
+        Cost cost = new Cost(0, 0, byteCount);
+        int parallelLevel = CostUtil.estimateParallelLevel(
+                true, context.getConnection().getQueryServices());
+        if (!orderBy.getOrderByExpressions().isEmpty()) {
+            Cost orderByCost = CostUtil.estimateOrderByCost(byteCount, 
parallelLevel);
+            cost = cost.plus(orderByCost);
+        }
+        return cost;
+    }
+
+    @Override
     public List<KeyRange> getSplits() {
         if (splits == null)
             return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index fab7c59..3e380da 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -54,6 +54,7 @@ import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.query.KeyRange;
@@ -192,6 +193,23 @@ public class SortMergeJoinPlan implements QueryPlan {
     }
 
     @Override
+    public Cost getCost() {
+        Long byteCount = null;
+        try {
+            byteCount = getEstimatedBytesToScan();
+        } catch (SQLException e) {
+            // ignored.
+        }
+
+        if (byteCount == null) {
+            return Cost.UNKNOWN;
+        }
+
+        Cost cost = new Cost(0, 0, byteCount);
+        return cost.plus(lhsPlan.getCost()).plus(rhsPlan.getCost());
+    }
+
+    @Override
     public StatementContext getContext() {
         return context;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index e06522f..e6bf654 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -43,6 +43,7 @@ import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.UnionResultIterators;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.TableRef;
@@ -210,6 +211,15 @@ public class UnionPlan implements QueryPlan {
     }
 
     @Override
+    public Cost getCost() {
+        Cost cost = Cost.ZERO;
+        for (QueryPlan plan : plans) {
+            cost = cost.plus(plan.getCost());
+        }
+        return cost;
+    }
+
+    @Override
     public ParameterMetaData getParameterMetaData() {
         return paramMetaData;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index f418bc9..94aeb34 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -91,6 +91,7 @@ import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.iterate.MaterializedResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.AddColumnStatement;
 import org.apache.phoenix.parse.AddJarsStatement;
 import org.apache.phoenix.parse.AliasedNode;
@@ -644,6 +645,11 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
                 }
 
                 @Override
+                public Cost getCost() {
+                    return Cost.ZERO;
+                }
+
+                @Override
                 public TableRef getTableRef() {
                     return null;
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java 
b/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java
new file mode 100644
index 0000000..b83f354
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/Cost.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.optimize;
+
+import java.util.Objects;
+
+/**
+ * Optimizer cost in terms of CPU, memory, and I/O usage, the unit of which is 
now the
+ * number of bytes processed.
+ *
+ */
+public class Cost implements Comparable<Cost> {
+    /** The unknown cost. */
+    public static Cost UNKNOWN = new Cost(Double.NaN, Double.NaN, Double.NaN) {
+        @Override
+        public String toString() {
+            return "{unknown}";
+        }
+    };
+
+    /** The zero cost. */
+    public static Cost ZERO = new Cost(0, 0, 0) {
+        @Override
+        public String toString() {
+            return "{zero}";
+        }        
+    };
+
+    private final double cpu;
+    private final double memory;
+    private final double io;
+
+    public Cost(double cpu, double memory, double io) {
+        this.cpu = cpu;
+        this.memory = memory;
+        this.io = io;
+    }
+
+    public double getCpu() {
+        return cpu;
+    }
+
+    public double getMemory() {
+        return memory;
+    }
+
+    public double getIo() {
+        return io;
+    }
+
+    public boolean isUnknown() {
+        return this == UNKNOWN;
+    }
+
+    public Cost plus(Cost other) {
+        if (isUnknown() || other.isUnknown()) {
+            return UNKNOWN;
+        }
+
+        return new Cost(
+                this.cpu + other.cpu,
+                this.memory + other.memory,
+                this.io + other.io);
+    }
+
+    public Cost multiplyBy(double factor) {
+        if (isUnknown()) {
+            return UNKNOWN;
+        }
+
+        return new Cost(
+                this.cpu * factor,
+                this.memory * factor,
+                this.io * factor);
+    }
+
+    // TODO right now for simplicity, we choose to ignore CPU and memory 
costs. We may
+    // add those into account as our cost model mature.
+    @Override
+    public int compareTo(Cost other) {
+        if (isUnknown() && other.isUnknown()) {
+            return 0;
+        } else if (isUnknown() && !other.isUnknown()) {
+            return 1;
+        } else if (!isUnknown() && other.isUnknown()) {
+            return -1;
+        }
+
+        double d = this.io - other.io;
+        return d == 0 ? 0 : (d > 0 ? 1 : -1);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return this == obj
+                || (obj instanceof Cost && this.compareTo((Cost) obj) == 0);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(cpu, memory, io);
+    }
+
+    @Override
+    public String toString() {
+        return "{cpu: " + cpu + ", memory: " + memory + ", io: " + io + "}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 028fc94..8481bc5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -69,11 +69,13 @@ public class QueryOptimizer {
 
     private final QueryServices services;
     private final boolean useIndexes;
+    private final boolean costBased;
     private long indexPendingDisabledThreshold;
 
     public QueryOptimizer(QueryServices services) {
         this.services = services;
         this.useIndexes = 
this.services.getProps().getBoolean(QueryServices.USE_INDEXES_ATTRIB, 
QueryServicesOptions.DEFAULT_USE_INDEXES);
+        this.costBased = 
this.services.getProps().getBoolean(QueryServices.COST_BASED_OPTIMIZER_ENABLED, 
QueryServicesOptions.DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
         this.indexPendingDisabledThreshold = 
this.services.getProps().getLong(QueryServices.INDEX_PENDING_DISABLE_THRESHOLD,
             QueryServicesOptions.DEFAULT_INDEX_PENDING_DISABLE_THRESHOLD);
     }
@@ -96,7 +98,7 @@ public class QueryOptimizer {
     }
     
     public QueryPlan optimize(QueryPlan dataPlan, PhoenixStatement statement, 
List<? extends PDatum> targetColumns, ParallelIteratorFactory 
parallelIteratorFactory) throws SQLException {
-        List<QueryPlan>plans = getApplicablePlans(dataPlan, statement, 
targetColumns, parallelIteratorFactory, true);
+        List<QueryPlan> plans = getApplicablePlans(dataPlan, statement, 
targetColumns, parallelIteratorFactory, true);
         return plans.get(0);
     }
     
@@ -329,7 +331,8 @@ public class QueryOptimizer {
 
     /**
      * Order the plans among all the possible ones from best to worst.
-     * Since we don't keep stats yet, we use the following simple algorithm:
+     * If option COST_BASED_OPTIMIZER_ENABLED is on and stats are available, 
we order the plans based on
+     * their costs, otherwise we use the following simple algorithm:
      * 1) If the query is a point lookup (i.e. we have a set of exact row 
keys), choose that one immediately.
      * 2) If the query has an ORDER BY and a LIMIT, choose the plan that has 
all the ORDER BY expression
      * in the same order as the row key columns.
@@ -337,9 +340,6 @@ public class QueryOptimizer {
      *    a) the most row key columns that may be used to form the start/stop 
scan key (i.e. bound slots).
      *    b) the plan that preserves ordering for a group by.
      *    c) the non local index table plan
-     * TODO: We should make more of a cost based choice: The largest number of 
bound slots does not necessarily
-     * correspond to the least bytes scanned. We could consider the slots 
bound for upper and lower ranges 
-     * separately, or we could calculate the bytes scanned between the start 
and stop row of each table.
      * @param plans the list of candidate plans
      * @return list of plans ordered from best to worst.
      */
@@ -348,7 +348,21 @@ public class QueryOptimizer {
         if (plans.size() == 1) {
             return plans;
         }
-        
+
+        if (this.costBased) {
+            Collections.sort(plans, new Comparator<QueryPlan>() {
+                @Override
+                public int compare(QueryPlan plan1, QueryPlan plan2) {
+                    return plan1.getCost().compareTo(plan2.getCost());
+                }
+            });
+            // Return ordered list based on cost if stats are available; 
otherwise fall
+            // back to static ordering.
+            if (!plans.get(0).getCost().isUnknown()) {
+                return stopAtBestPlan ? plans.subList(0, 1) : plans;
+            }
+        }
+
         /**
          * If we have a plan(s) that are just point lookups (i.e. fully 
qualified row
          * keys), then favor those first.
@@ -448,7 +462,7 @@ public class QueryOptimizer {
             }
             
         });
-        
+
         return stopAtBestPlan ? bestCandidates.subList(0, 1) : bestCandidates;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 56d6e06..9dd3074 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -290,6 +290,8 @@ public interface QueryServices extends SQLCloseable {
     //Update Cache Frequency default config attribute
     public static final String DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB  = 
"phoenix.default.update.cache.frequency";
 
+    // Whether to enable cost-based-decision in the query optimizer
+    public static final String COST_BASED_OPTIMIZER_ENABLED = 
"phoenix.costbased.optimizer.enabled";
     public static final String SMALL_SCAN_THRESHOLD_ATTRIB = 
"phoenix.query.smallScanThreshold";
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 33a319b..11d9784 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -25,6 +25,7 @@ import static 
org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_
 import static 
org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
 import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC;
+import static 
org.apache.phoenix.query.QueryServices.COST_BASED_OPTIMIZER_ENABLED;
 import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK;
@@ -329,6 +330,8 @@ public class QueryServicesOptions {
     // RS -> RS calls for upsert select statements are disabled by default
     public static final boolean DEFAULT_ENABLE_SERVER_UPSERT_SELECT = false;
 
+    public static final boolean DEFAULT_COST_BASED_OPTIMIZER_ENABLED = false;
+
     private final Configuration config;
 
     private QueryServicesOptions(Configuration config) {
@@ -403,7 +406,8 @@ public class QueryServicesOptions {
             .setIfUnset(PHOENIX_QUERY_SERVER_ZK_ACL_PASSWORD, 
DEFAULT_PHOENIX_QUERY_SERVER_ZK_ACL_PASSWORD)
             .setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, 
DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING)
             .setIfUnset(STATS_COLLECTION_ENABLED, 
DEFAULT_STATS_COLLECTION_ENABLED)
-            .setIfUnset(USE_STATS_FOR_PARALLELIZATION, 
DEFAULT_USE_STATS_FOR_PARALLELIZATION);
+            .setIfUnset(USE_STATS_FOR_PARALLELIZATION, 
DEFAULT_USE_STATS_FOR_PARALLELIZATION)
+            .setIfUnset(COST_BASED_OPTIMIZER_ENABLED, 
DEFAULT_COST_BASED_OPTIMIZER_ENABLED);
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user 
set
         // it to 1, so we'll change it.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
new file mode 100644
index 0000000..1d4b8e0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CostUtil.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.optimize.Cost;
+import org.apache.phoenix.query.QueryServices;
+
+/**
+ * Utilities for computing costs.
+ *
+ * Some of the methods here should eventually be replaced by a metadata 
framework which
+ * estimates output metrics for each QueryPlan or operation, e.g. row count, 
byte count,
+ * etc.
+ */
+public class CostUtil {
+
+    // An estimate of the ratio of result data from group-by against the input 
data.
+    private final static double GROUPING_FACTOR = 0.1;
+
+    // Io operations conducted in intermediate evaluations like sorting or 
aggregation
+    // should be counted twice since they usually involve both read and write.
+    private final static double IO_COST_MULTIPLIER = 2.0;
+
+    /**
+     * Estimate the number of output bytes of an aggregate.
+     * @param byteCount the number of input bytes
+     * @param groupBy the compiled GroupBy object
+     * @param aggregatorsSize the byte size of aggregators
+     * @return the output byte count
+     */
+    public static double estimateAggregateOutputBytes(
+            double byteCount, GroupBy groupBy, int aggregatorsSize) {
+        if (groupBy.isUngroupedAggregate()) {
+            return aggregatorsSize;
+        }
+        return byteCount * GROUPING_FACTOR;
+    }
+
+    /**
+     * Estimate the cost of an aggregate.
+     * @param byteCount the number of input bytes
+     * @param groupBy the compiled GroupBy object
+     * @param aggregatorsSize the byte size of aggregators
+     * @param parallelLevel number of parallel workers or threads
+     * @return the cost
+     */
+    public static Cost estimateAggregateCost(
+            double byteCount, GroupBy groupBy, int aggregatorsSize, int 
parallelLevel) {
+        double outputBytes = estimateAggregateOutputBytes(byteCount, groupBy, 
aggregatorsSize);
+        double orderedFactor = groupBy.isOrderPreserving() ? 0.2 : 1.0;
+        return new Cost(0, 0, outputBytes * orderedFactor * IO_COST_MULTIPLIER 
/ parallelLevel);
+    }
+
+    /**
+     * Estimate the cost of an order-by
+     * @param byteCount the number of input bytes
+     * @param parallelLevel number of parallel workers or threads
+     * @return the cost
+     */
+    public static Cost estimateOrderByCost(double byteCount, int 
parallelLevel) {
+        return new Cost(0, 0, byteCount * IO_COST_MULTIPLIER / parallelLevel);
+    }
+
+    /**
+     * Estimate the parallel level of an operation
+     * @param runningOnServer if the operation will be running on server side
+     * @param services the QueryServices object
+     * @return the parallel level
+     */
+    public static int estimateParallelLevel(boolean runningOnServer, 
QueryServices services) {
+        // TODO currently return constants for simplicity, should derive from 
cluster config.
+        return runningOnServer ? 10 : 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/541d6ac2/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index cb34d2b..1903dda 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -52,6 +52,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PSchema;
@@ -486,6 +487,11 @@ public class ParallelIteratorsSplitTest extends 
BaseConnectionlessQueryTest {
             public Long getEstimateInfoTimestamp() throws SQLException {
                 return null;
             }
+
+            @Override
+            public Cost getCost() {
+                return Cost.ZERO;
+            }
             
         }, null, new 
SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()),
 context.getScan(), false, null, null);
         List<KeyRange> keyRanges = parallelIterators.getSplits();

Reply via email to