Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 cdca9377e -> bb8d7664f
PHOENIX-1779 Parallelize fetching of next batch of records for scans corresponding to queries with no order by Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/bb8d7664 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/bb8d7664 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/bb8d7664 Branch: refs/heads/4.x-HBase-0.98 Commit: bb8d7664fc5094356b83100648877420e6d32874 Parents: cdca937 Author: Samarth <[email protected]> Authored: Fri Apr 17 11:27:32 2015 -0700 Committer: Samarth <[email protected]> Committed: Fri Apr 17 11:27:32 2015 -0700 ---------------------------------------------------------------------- .../end2end/SkipScanAfterManualSplitIT.java | 2 +- .../iterate/RoundRobinResultIteratorIT.java | 319 ++++++++++++++++++ .../apache/phoenix/mapreduce/IndexToolIT.java | 3 +- .../org/apache/phoenix/compile/QueryPlan.java | 11 + .../apache/phoenix/compile/TraceQueryPlan.java | 5 + .../apache/phoenix/execute/AggregatePlan.java | 5 + .../apache/phoenix/execute/BaseQueryPlan.java | 6 + .../phoenix/execute/DegenerateQueryPlan.java | 5 + .../phoenix/execute/DelegateQueryPlan.java | 6 + .../org/apache/phoenix/execute/ScanPlan.java | 21 +- .../phoenix/execute/SortMergeJoinPlan.java | 5 + .../org/apache/phoenix/execute/UnionPlan.java | 5 + .../iterate/RoundRobinResultIterator.java | 329 +++++++++++++++++++ .../apache/phoenix/jdbc/PhoenixResultSet.java | 8 +- .../apache/phoenix/jdbc/PhoenixStatement.java | 5 + .../phoenix/mapreduce/PhoenixRecordReader.java | 9 +- .../org/apache/phoenix/query/QueryServices.java | 6 + .../phoenix/query/QueryServicesOptions.java | 19 +- .../apache/phoenix/schema/MetaDataClient.java | 2 +- .../java/org/apache/phoenix/util/ScanUtil.java | 22 ++ .../phoenix/compile/QueryCompilerTest.java | 62 +++- .../phoenix/filter/SkipScanBigFilterTest.java | 2 +- .../query/ParallelIteratorsSplitTest.java | 5 + .../phoenix/query/QueryServicesTestImpl.java | 5 +- 24 files changed, 841 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java index a30a668..6d08202 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java @@ -69,7 +69,7 @@ public class SkipScanAfterManualSplitIT extends BaseHBaseManagedTimeIT { // props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(64)); props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(32)); // enables manual splitting on salted tables - props.put(QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, Boolean.toString(false)); + props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false)); props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1000)); props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java new file mode 100644 index 0000000..1e3db11 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; +import org.apache.phoenix.end2end.Shadower; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixResultSet; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class RoundRobinResultIteratorIT extends BaseHBaseManagedTimeIT { + + private static final int NUM_SALT_BUCKETS = 4; + + @BeforeClass + @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(1); + props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(32)); + /* + * Don't force row key order. This causes RoundRobinResultIterator to be used if there was no order by specified + * on the query. + */ + props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false)); + props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(1000)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testRoundRobinAfterTableSplit() throws Exception { + String tableName = "ROUNDROBINSPLIT"; + byte[] tableNameBytes = Bytes.toBytes(tableName); + int numRows = setupTableForSplit(tableName); + Connection conn = DriverManager.getConnection(getUrl()); + ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + int nRegions = services.getAllTableRegions(tableNameBytes).size(); + int nRegionsBeforeSplit = nRegions; + HBaseAdmin admin = services.getAdmin(); + try { + // Split is an async operation. So hoping 10 seconds is long enough time. + // If the test tends to flap, then you might want to increase the wait time + admin.split(tableName); + CountDownLatch latch = new CountDownLatch(1); + int nTries = 0; + long waitTimeMillis = 1000; + while (nRegions == nRegionsBeforeSplit && nTries < 10) { + latch.await(waitTimeMillis, TimeUnit.MILLISECONDS); + nRegions = services.getAllTableRegions(tableNameBytes).size(); + nTries++; + } + + String query = "SELECT * FROM " + tableName; + Statement stmt = conn.createStatement(); + stmt.setFetchSize(10); // this makes scanner caches to be replenished in parallel. + ResultSet rs = stmt.executeQuery(query); + int numRowsRead = 0; + while (rs.next()) { + numRowsRead++; + } + nRegions = services.getAllTableRegions(tableNameBytes).size(); + // Region cache has been updated, as there are more regions now + assertNotEquals(nRegions, nRegionsBeforeSplit); + assertEquals(numRows, numRowsRead); + } finally { + admin.close(); + } + + } + + @Test + public void testSelectAllRowsWithDifferentFetchSizes_salted() throws Exception { + testSelectAllRowsWithDifferentFetchSizes(true); + } + + @Test + public void testSelectAllRowsWithDifferentFetchSizes_unsalted() throws Exception { + testSelectAllRowsWithDifferentFetchSizes(false); + } + + private void testSelectAllRowsWithDifferentFetchSizes(boolean salted) throws Exception { + String tableName = "ALLROWS" + (salted ? "_SALTED" : "_UNSALTED"); + int numRows = 9; + Set<String> expectedKeys = Collections.unmodifiableSet(createTableAndInsertRows(tableName, numRows, salted, false)); + Connection conn = DriverManager.getConnection(getUrl()); + PreparedStatement stmt = conn.prepareStatement("SELECT K, V FROM " + tableName); + tryWithFetchSize(new HashSet<>(expectedKeys), 1, stmt, 0); + tryWithFetchSize(new HashSet<>(expectedKeys), 2, stmt, salted ? 2 : 5); + tryWithFetchSize(new HashSet<>(expectedKeys), numRows - 1, stmt, salted ? 0 : 1); + tryWithFetchSize(new HashSet<>(expectedKeys), numRows, stmt, salted ? 0 : 1); + tryWithFetchSize(new HashSet<>(expectedKeys), numRows + 1, stmt, salted ? 0 : 1); + tryWithFetchSize(new HashSet<>(expectedKeys), numRows + 2, stmt, 0); + } + + @Test + public void testSelectRowsWithFilterAndDifferentFetchSizes_unsalted() throws Exception { + testSelectRowsWithFilterAndDifferentFetchSizes(false); + } + + @Test + public void testSelectRowsWithFilterAndDifferentFetchSizes_salted() throws Exception { + testSelectRowsWithFilterAndDifferentFetchSizes(true); + } + + private void testSelectRowsWithFilterAndDifferentFetchSizes(boolean salted) throws Exception { + String tableName = "ROWSWITHFILTER" + (salted ? "_SALTED" : "_UNSALTED"); + int numRows = 6; + Set<String> insertedKeys = createTableAndInsertRows(tableName, numRows, salted, false); + Connection conn = DriverManager.getConnection(getUrl()); + PreparedStatement stmt = conn.prepareStatement("SELECT K, V FROM " + tableName + " WHERE K = ?"); + stmt.setString(1, "key1"); // will return only 1 row + int numRowsFiltered = 1; + tryWithFetchSize(Sets.newHashSet("key1"), 1, stmt, 0); + tryWithFetchSize(Sets.newHashSet("key1"), 2, stmt, salted ? 1 : 1); + tryWithFetchSize(Sets.newHashSet("key1"), 3, stmt, 0); + + stmt = conn.prepareStatement("SELECT K, V FROM " + tableName + " WHERE K > ?"); + stmt.setString(1, "key2"); + insertedKeys.remove("key1"); + insertedKeys.remove("key2"); // query should return 4 rows after key2. + numRowsFiltered = 4; + tryWithFetchSize(new HashSet<>(insertedKeys), 1, stmt, 0); + tryWithFetchSize(new HashSet<>(insertedKeys), 2, stmt, salted ? 1 : 2); + tryWithFetchSize(new HashSet<>(insertedKeys), numRowsFiltered - 1, stmt, salted ? 0 : 1); + tryWithFetchSize(new HashSet<>(insertedKeys), numRowsFiltered, stmt, salted ? 0 : 1); + tryWithFetchSize(new HashSet<>(insertedKeys), numRowsFiltered + 1, stmt, salted ? 0 : 1); + tryWithFetchSize(new HashSet<>(insertedKeys), numRowsFiltered + 2, stmt, 0); + + stmt = conn.prepareStatement("SELECT K, V FROM " + tableName + " WHERE K > ?"); + stmt.setString(1, "key6"); + insertedKeys.clear(); // query should return no rows; + tryWithFetchSize(new HashSet<>(insertedKeys), 1, stmt, 0); + tryWithFetchSize(new HashSet<>(insertedKeys), 2, stmt, 0); + tryWithFetchSize(new HashSet<>(insertedKeys), numRows - 1, stmt, 0); + tryWithFetchSize(new HashSet<>(insertedKeys), numRows, stmt, 0); + tryWithFetchSize(new HashSet<>(insertedKeys), numRows + 1, stmt, 0); + } + + private Set<String> createTableAndInsertRows(String tableName, int numRows, boolean salted, boolean addTableNameToKey) throws Exception { + String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + (salted ? "SALT_BUCKETS=" + NUM_SALT_BUCKETS : ""); + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute(ddl); + String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)"; + PreparedStatement stmt = conn.prepareStatement(dml); + final Set<String> expectedKeys = new HashSet<>(numRows); + for (int i = 1; i <= numRows; i++) { + String key = (addTableNameToKey ? tableName : "") + ("key" + i); + expectedKeys.add(key); + stmt.setString(1, key); + stmt.setString(2, "value" + i); + stmt.executeUpdate(); + } + conn.commit(); + return expectedKeys; + } + + @Test + public void testFetchSizesAndRVCExpression() throws Exception { + String tableName = "RVCTest"; + Set<String> insertedKeys = Collections.unmodifiableSet(createTableAndInsertRows(tableName, 4, false, false)); + Connection conn = DriverManager.getConnection(getUrl()); + PreparedStatement stmt = conn.prepareStatement("SELECT K FROM " + tableName + " WHERE (K, V) > (?, ?)"); + stmt.setString(1, "key0"); + stmt.setString(2, "value0"); + tryWithFetchSize(new HashSet<>(insertedKeys), 1, stmt, 0); + tryWithFetchSize(new HashSet<>(insertedKeys), 2, stmt, 2); + tryWithFetchSize(new HashSet<>(insertedKeys), 3, stmt, 1); + tryWithFetchSize(new HashSet<>(insertedKeys), 4, stmt, 1); + } + + private static void tryWithFetchSize(Set<String> expectedKeys, int fetchSize, PreparedStatement stmt, int numFetches) throws Exception { + stmt.setFetchSize(fetchSize); + ResultSet rs = stmt.executeQuery(); + int expectedNumRows = expectedKeys.size(); + int numRows = 0; + while (rs.next()) { + expectedKeys.remove(rs.getString(1)); + numRows ++; + } + assertEquals("Number of rows didn't match", expectedNumRows, numRows); + assertTrue("Not all rows were returned for fetch size: " + fetchSize + " - " + expectedKeys, expectedKeys.size() == 0); + assertRoundRobinBehavior(rs, stmt, numFetches); + } + + private static int setupTableForSplit(String tableName) throws Exception { + int batchSize = 25; + int maxFileSize = 1024 * 10; + int payLoadSize = 1024; + String payload; + StringBuilder buf = new StringBuilder(); + for (int i = 0; i < payLoadSize; i++) { + buf.append('a'); + } + payload = buf.toString(); + + int MIN_CHAR = 'a'; + int MAX_CHAR = 'z'; + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("CREATE TABLE " + tableName + "(" + + "a VARCHAR PRIMARY KEY, b VARCHAR) " + + HTableDescriptor.MAX_FILESIZE + "=" + maxFileSize + "," + + " SALT_BUCKETS = " + NUM_SALT_BUCKETS); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?)"); + int rowCount = 0; + for (int c1 = MIN_CHAR; c1 <= MAX_CHAR; c1++) { + for (int c2 = MIN_CHAR; c2 <= MAX_CHAR; c2++) { + String pk = Character.toString((char)c1) + Character.toString((char)c2); + stmt.setString(1, pk); + stmt.setString(2, payload); + stmt.execute(); + rowCount++; + if (rowCount % batchSize == 0) { + conn.commit(); + } + } + } + conn.commit(); + ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + HBaseAdmin admin = services.getAdmin(); + try { + admin.flush(tableName); + } finally { + admin.close(); + } + conn.close(); + return rowCount; + } + + @Test + public void testUnionAllSelects() throws Exception { + int insertedRowsA = 10; + int insertedRowsB = 5; + int insertedRowsC = 7; + Set<String> keySetA = createTableAndInsertRows("TABLEA", insertedRowsA, true, true); + Set<String> keySetB = createTableAndInsertRows("TABLEB", insertedRowsB, true, true); + Set<String> keySetC = createTableAndInsertRows("TABLEC", insertedRowsC, false, true); + String query = "SELECT K FROM TABLEA UNION ALL SELECT K FROM TABLEB UNION ALL SELECT K FROM TABLEC"; + Connection conn = DriverManager.getConnection(getUrl()); + PreparedStatement stmt = conn.prepareStatement(query); + stmt.setFetchSize(2); // force parallel fetch of scanner cache + ResultSet rs = stmt.executeQuery(); + int rowsA = 0, rowsB = 0, rowsC = 0; + while (rs.next()) { + String key = rs.getString(1); + if (key.startsWith("TABLEA")) { + rowsA++; + } else if (key.startsWith("TABLEB")) { + rowsB++; + } else if (key.startsWith("TABLEC")) { + rowsC++; + } + keySetA.remove(key); + keySetB.remove(key); + keySetC.remove(key); + } + assertEquals("Not all rows of tableA were returned", 0, keySetA.size()); + assertEquals("Not all rows of tableB were returned", 0, keySetB.size()); + assertEquals("Not all rows of tableC were returned", 0, keySetC.size()); + assertEquals("Number of rows retrieved didn't match for tableA", insertedRowsA, rowsA); + assertEquals("Number of rows retrieved didnt match for tableB", insertedRowsB, rowsB); + assertEquals("Number of rows retrieved didn't match for tableC", insertedRowsC, rowsC); + } + + + private static ResultIterator getResultIterator(ResultSet rs) throws SQLException { + return rs.unwrap(PhoenixResultSet.class).getUnderlyingIterator(); + } + + private static void assertRoundRobinBehavior(ResultSet rs, Statement stmt, int numFetches) throws SQLException { + ResultIterator itr = getResultIterator(rs); + if (stmt.getFetchSize() > 1) { + assertTrue(itr instanceof RoundRobinResultIterator); + RoundRobinResultIterator roundRobinItr = (RoundRobinResultIterator)itr; + assertEquals(numFetches, roundRobinItr.getNumberOfParallelFetches()); + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java index 2b7b16b..6761275 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java @@ -249,8 +249,7 @@ public class IndexToolIT { if(isLocal) { final String localIndexName = MetaDataUtil.getLocalIndexTableName(dataTable); expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY RANGE SCAN OVER %s [-32768]" - + "\n SERVER FILTER BY FIRST KEY ONLY" - + "\nCLIENT MERGE SORT", localIndexName); + + "\n SERVER FILTER BY FIRST KEY ONLY", localIndexName); } else { expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s" + "\n SERVER FILTER BY FIRST KEY ONLY",indxTable); http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java index a76993c..d0c63fa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java @@ -20,6 +20,7 @@ package org.apache.phoenix.compile; import java.sql.SQLException; import java.util.List; +import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; @@ -69,4 +70,14 @@ public interface QueryPlan extends StatementPlan { public boolean isDegenerate(); public boolean isRowKeyOrdered(); + + /** + * + * @return whether underlying {@link ResultScanner} can be picked up in a round-robin + * fashion. Generally, selecting scanners in such a fashion is possible if rows don't + * have to be returned back in a certain order. + * @throws SQLException + */ + public boolean useRoundRobinIterator() throws SQLException; + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java index 815ac1e..12360f5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java @@ -232,4 +232,9 @@ public class TraceQueryPlan implements QueryPlan { public boolean isRowKeyOrdered() { return false; } + + @Override + public boolean useRoundRobinIterator() { + return false; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 4f344b6..ba137f8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -210,4 +210,9 @@ public class AggregatePlan extends BaseQueryPlan { } return resultScanner; } + + @Override + public boolean useRoundRobinIterator() throws SQLException { + return false; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index 94233c8..c4e0345 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -17,6 +17,9 @@ */ package org.apache.phoenix.execute; +import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.FWD_ROW_KEY_ORDER_BY; +import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.REV_ROW_KEY_ORDER_BY; + import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -51,6 +54,8 @@ import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.TableName; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PName; @@ -393,4 +398,5 @@ public abstract class BaseQueryPlan implements QueryPlan { public boolean isRowKeyOrdered() { return groupBy.isEmpty() ? orderBy.getOrderByExpressions().isEmpty() : groupBy.isOrderPreserving(); } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java index 70e59b9..fda53ea 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java @@ -55,4 +55,9 @@ public class DegenerateQueryPlan extends BaseQueryPlan { return null; } + @Override + public boolean useRoundRobinIterator() { + return false; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java index 4d50ba0..7026433 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java @@ -18,6 +18,7 @@ package org.apache.phoenix.execute; import java.sql.ParameterMetaData; +import java.sql.SQLException; import java.util.List; import org.apache.hadoop.hbase.client.Scan; @@ -101,5 +102,10 @@ public abstract class DelegateQueryPlan implements QueryPlan { public boolean isRowKeyOrdered() { return delegate.isRowKeyOrdered(); } + + @Override + public boolean useRoundRobinIterator() throws SQLException { + return delegate.useRoundRobinIterator(); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index d0a71f4..884d835 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -18,6 +18,8 @@ package org.apache.phoenix.execute; +import static org.apache.phoenix.util.ScanUtil.shouldRowsBeInRowKeyOrder; + import java.sql.SQLException; import java.util.List; @@ -38,6 +40,7 @@ import org.apache.phoenix.iterate.ParallelIteratorFactory; import org.apache.phoenix.iterate.ParallelIterators; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.iterate.ResultIterators; +import org.apache.phoenix.iterate.RoundRobinResultIterator; import org.apache.phoenix.iterate.SequenceResultIterator; import org.apache.phoenix.iterate.SerialIterators; import org.apache.phoenix.iterate.SpoolingResultIterator; @@ -54,6 +57,7 @@ import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.StatisticsUtil; import org.apache.phoenix.util.LogUtil; +import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,7 +132,7 @@ public class ScanPlan extends BaseQueryPlan { private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context, TableRef table, OrderBy orderBy, Integer limit, boolean allowPageFilter) throws SQLException { - if (isSerial(context, table, orderBy, limit, allowPageFilter)) { + if (isSerial(context, table, orderBy, limit, allowPageFilter) || ScanUtil.isRoundRobinPossible(orderBy, context)) { return ParallelIteratorFactory.NOOP_FACTORY; } ParallelIteratorFactory spoolingResultIteratorFactory = @@ -182,13 +186,10 @@ public class ScanPlan extends BaseQueryPlan { if (isOrdered) { scanner = new MergeSortTopNResultIterator(iterators, limit, orderBy.getOrderByExpressions()); } else { - if ((isSalted || table.getIndexType() == IndexType.LOCAL) && - (context.getConnection().getQueryServices().getProps().getBoolean( - QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, - QueryServicesOptions.DEFAULT_ROW_KEY_ORDER_SALTED_TABLE) || - orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY || - orderBy == OrderBy.REV_ROW_KEY_ORDER_BY)) { // ORDER BY was optimized out b/c query is in row key order + if ((isSalted || table.getIndexType() == IndexType.LOCAL) && shouldRowsBeInRowKeyOrder(orderBy, context)) { scanner = new MergeSortRowKeyResultIterator(iterators, isSalted ? SaltingUtil.NUM_SALTING_BYTES : 0, orderBy == OrderBy.REV_ROW_KEY_ORDER_BY); + } else if (useRoundRobinIterator()) { + scanner = new RoundRobinResultIterator(iterators, this); } else { scanner = new ConcatResultIterator(iterators); } @@ -202,4 +203,10 @@ public class ScanPlan extends BaseQueryPlan { } return scanner; } + + @Override + public boolean useRoundRobinIterator() throws SQLException { + return ScanUtil.isRoundRobinPossible(orderBy, context); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java index ce01b67..01e87e4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java @@ -631,6 +631,11 @@ public class SortMergeJoinPlan implements QueryPlan { } } + + @Override + public boolean useRoundRobinIterator() { + return false; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java index 973f37e..031b58b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java @@ -186,5 +186,10 @@ public class UnionPlan implements QueryPlan { public List<QueryPlan> getPlans() { return this.plans; } + + @Override + public boolean useRoundRobinIterator() throws SQLException { + return false; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java new file mode 100644 index 0000000..4a9ad3e --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.FAILED_QUERY; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.ServerUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; + +/** + * ResultIterator that keeps track of the number of records fetched by each {@link PeekingResultIterator} making sure it + * asks for records from each iterator in a round-robin fashion. When the iterators have fetched the scan cache size of + * records, it submits the iterators to the thread pool to help parallelize the I/O needed to fetch the next batch of + * records. This iterator assumes that the PeekingResultIterators that it manages are not nested i.e. they directly + * manage the underlying scanners. This kind of ResultIterator should only be used when one doesn't care about the order + * in which records are returned. + */ +public class RoundRobinResultIterator implements ResultIterator { + + private static final Logger logger = LoggerFactory.getLogger(RoundRobinResultIterator.class); + + private final int threshold; + + private int numScannersCacheExhausted = 0; + private ResultIterators resultIterators; + + private List<RoundRobinIterator> openIterators = new ArrayList<>(); + + private int index; + private boolean closed; + private final QueryPlan plan; + + // For testing purposes + private int numParallelFetches; + + public RoundRobinResultIterator(ResultIterators iterators, QueryPlan plan) { + this.resultIterators = iterators; + this.plan = plan; + this.threshold = getThreshold(); + } + + public RoundRobinResultIterator(List<PeekingResultIterator> iterators, QueryPlan plan) { + this.resultIterators = null; + this.plan = plan; + this.threshold = getThreshold(); + initOpenIterators(wrapToRoundRobinIterators(iterators)); + } + + public static ResultIterator newIterator(final List<PeekingResultIterator> iterators, QueryPlan plan) { + if (iterators.isEmpty()) { return EMPTY_ITERATOR; } + if (iterators.size() == 1) { return iterators.get(0); } + return new RoundRobinResultIterator(iterators, plan); + } + + @Override + public Tuple next() throws SQLException { + List<RoundRobinIterator> iterators; + int size; + while ((size = (iterators = getIterators()).size()) > 0) { + index = index % size; + RoundRobinIterator itr = iterators.get(index); + if (itr.getNumRecordsRead() < threshold) { + Tuple tuple; + if ((tuple = itr.peek()) != null) { + tuple = itr.next(); + if (itr.getNumRecordsRead() == threshold) { + numScannersCacheExhausted++; + } + index = (index + 1) % size; + return tuple; + } else { + // The underlying scanner is exhausted. Close the iterator and un-track it. + itr.close(); + iterators.remove(index); + if (iterators.size() == 0) { + close(); + } + } + } else { + index = (index + 1) % size; + } + } + return null; + } + + @Override + public void close() throws SQLException { + if (closed) { return; } + closed = true; + SQLException toThrow = null; + try { + if (resultIterators != null) { + resultIterators.close(); + } + } catch (Exception e) { + toThrow = ServerUtil.parseServerException(e); + } finally { + try { + if (openIterators.size() > 0) { + for (RoundRobinIterator itr : openIterators) { + try { + itr.close(); + } catch (Exception e) { + if (toThrow == null) { + toThrow = ServerUtil.parseServerException(e); + } else { + toThrow.setNextException(ServerUtil.parseServerException(e)); + } + } + } + } + } finally { + if (toThrow != null) { throw toThrow; } + } + } + } + + @Override + public void explain(List<String> planSteps) { + if (resultIterators != null) { + resultIterators.explain(planSteps); + } + } + + @VisibleForTesting + int getNumberOfParallelFetches() { + return numParallelFetches; + } + + @VisibleForTesting + QueryPlan getQueryPlan() { + return plan; + } + + private List<RoundRobinIterator> getIterators() throws SQLException { + if (closed) { return Collections.emptyList(); } + if (openIterators.size() > 0 && openIterators.size() == numScannersCacheExhausted) { + /* + * All the scanners have exhausted their cache. Submit the scanners back to the pool so that they can fetch + * the next batch of records in parallel. + */ + initOpenIterators(fetchNextBatch()); + } else if (openIterators.size() == 0 && resultIterators != null) { + List<PeekingResultIterator> iterators = resultIterators.getIterators(); + initOpenIterators(wrapToRoundRobinIterators(iterators)); + } + return openIterators; + } + + private List<RoundRobinIterator> wrapToRoundRobinIterators(List<PeekingResultIterator> iterators) { + List<RoundRobinIterator> roundRobinItrs = new ArrayList<>(iterators.size()); + for (PeekingResultIterator itr : iterators) { + roundRobinItrs.add(new RoundRobinIterator(itr, null)); + } + return roundRobinItrs; + } + + private void initOpenIterators(List<RoundRobinIterator> iterators) { + openIterators.clear(); + openIterators.addAll(iterators); + index = 0; + numScannersCacheExhausted = 0; + } + + private int getThreshold() { + int cacheSize = getScannerCacheSize(); + checkArgument(cacheSize > 1, "RoundRobinResultIterator doesn't work when cache size is less than or equal to 1"); + return cacheSize - 1; + } + + private int getScannerCacheSize() { + try { + return plan.getContext().getStatement().getFetchSize(); + } catch (Throwable e) { + Throwables.propagate(e); + } + return -1; // unreachable + } + + private List<RoundRobinIterator> fetchNextBatch() throws SQLException { + int numExpectedIterators = openIterators.size(); + List<Future<Tuple>> futures = new ArrayList<>(numExpectedIterators); + List<RoundRobinIterator> results = new ArrayList<>(); + + // Randomize the order in which we will be hitting region servers to try not overload particular region servers. + Collections.shuffle(openIterators); + boolean success = false; + SQLException toThrow = null; + try { + StatementContext context = plan.getContext(); + final ConnectionQueryServices services = context.getConnection().getQueryServices(); + ExecutorService executor = services.getExecutor(); + numParallelFetches++; + if (logger.isDebugEnabled()) { + logger.debug("Performing parallel fetch for " + openIterators.size() + " iterators. "); + } + for (final RoundRobinIterator itr : openIterators) { + Future<Tuple> future = executor.submit(new Callable<Tuple>() { + @Override + public Tuple call() throws Exception { + // Read the next record to refill the scanner's cache. + return itr.next(); + } + }); + futures.add(future); + } + int i = 0; + for (Future<Tuple> future : futures) { + Tuple tuple = future.get(); + if (tuple != null) { + results.add(new RoundRobinIterator(openIterators.get(i), tuple)); + } else { + // Underlying scanner is exhausted. So close it. + openIterators.get(i).close(); + } + i++; + } + success = true; + return results; + } catch (SQLException e) { + toThrow = e; + } catch (Exception e) { + toThrow = ServerUtil.parseServerException(e); + } finally { + try { + if (!success) { + try { + close(); + } catch (Exception e) { + if (toThrow == null) { + toThrow = ServerUtil.parseServerException(e); + } else { + toThrow.setNextException(ServerUtil.parseServerException(e)); + } + } + } + } finally { + if (toThrow != null) { + FAILED_QUERY.increment(); + throw toThrow; + } + } + } + return null; // Not reachable + } + + /** + * Inner class that delegates to {@link PeekingResultIterator} keeping track the number of records it has read. Also + * keeps track of the tuple the {@link PeekingResultIterator} read in the previous next() call before it ran out of + * underlying scanner cache. + */ + private class RoundRobinIterator implements PeekingResultIterator { + + private PeekingResultIterator delegate; + private Tuple tuple; + private int numRecordsRead; + + private RoundRobinIterator(PeekingResultIterator itr, Tuple tuple) { + this.delegate = itr; + this.tuple = tuple; + this.numRecordsRead = 0; + } + + @Override + public void close() throws SQLException { + delegate.close(); + } + + @Override + public Tuple next() throws SQLException { + if (tuple != null) { + Tuple t = tuple; + tuple = null; + return t; + } + numRecordsRead++; + return delegate.next(); + } + + @Override + public void explain(List<String> planSteps) { + delegate.explain(planSteps); + } + + @Override + public Tuple peek() throws SQLException { + if (tuple != null) { return tuple; } + return delegate.peek(); + } + + public int getNumRecordsRead() { + return numRecordsRead; + } + + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java index 49e384c..8ee56ea 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java @@ -65,9 +65,10 @@ import org.apache.phoenix.schema.types.PTimestamp; import org.apache.phoenix.schema.types.PTinyint; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; -import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.SQLCloseable; +import com.google.common.annotations.VisibleForTesting; + /** @@ -1255,4 +1256,9 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho public <T> T getObject(String columnLabel, Class<T> type) throws SQLException { return (T) getObject(columnLabel); // Just ignore type since we only support built-in types } + + @VisibleForTesting + public ResultIterator getUnderlyingIterator() { + return scanner; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 465d84a..21b641b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -489,6 +489,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho public boolean isRowKeyOrdered() { return true; } + + @Override + public boolean useRoundRobinIterator() throws SQLException { + return false; + } }; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java index 2074658..eb6dc3d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java @@ -36,6 +36,7 @@ import org.apache.phoenix.iterate.ConcatResultIterator; import org.apache.phoenix.iterate.LookAheadResultIterator; import org.apache.phoenix.iterate.PeekingResultIterator; import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.iterate.RoundRobinResultIterator; import org.apache.phoenix.iterate.SequenceResultIterator; import org.apache.phoenix.iterate.TableResultIterator; import org.apache.phoenix.jdbc.PhoenixResultSet; @@ -100,11 +101,11 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null try { List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size()); for (Scan scan : scans) { - final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan); + final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(), scan); PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator); iterators.add(peekingResultIterator); } - ResultIterator iterator = ConcatResultIterator.newIterator(iterators); + ResultIterator iterator = queryPlan.useRoundRobinIterator() ? RoundRobinResultIterator.newIterator(iterators, queryPlan) : ConcatResultIterator.newIterator(iterators); if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) { iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager()); } @@ -117,8 +118,8 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null Throwables.propagate(e); } } - - @Override + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (key == null) { key = NullWritable.get(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 23f3288..4a5b304 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -82,7 +82,10 @@ public interface QueryServices extends SQLCloseable { public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize"; public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize"; public static final String MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB = "phoenix.coprocessor.maxServerCacheTimeToLiveMs"; + + // Deprecated. Use FORCE_ROW_KEY_ORDER instead. public static final String ROW_KEY_ORDER_SALTED_TABLE_ATTRIB = "phoenix.query.rowKeyOrderSaltedTable"; + public static final String USE_INDEXES_ATTRIB = "phoenix.query.useIndexes"; public static final String IMMUTABLE_ROWS_ATTRIB = "phoenix.mutate.immutableRows"; public static final String INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB = "phoenix.index.mutableBatchSizeThreshold"; @@ -158,6 +161,9 @@ public interface QueryServices extends SQLCloseable { // rpc queue configs public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.rpc.index.handler.count"; public static final String METADATA_HANDLER_COUNT_ATTRIB = "phoenix.rpc.metadata.handler.count"; + + public static final String FORCE_ROW_KEY_ORDER_ATTRIB = "phoenix.query.force.rowkeyorder"; + /** * Get executor service used for parallel scans http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 3912ea5..cf62333 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -25,6 +25,7 @@ import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB import static org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK; import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB; import static org.apache.phoenix.query.QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB; +import static org.apache.phoenix.query.QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB; import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB; import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB; @@ -100,7 +101,6 @@ public class QueryServicesOptions { public static final String DEFAULT_DATE_FORMAT_TIMEZONE = DateUtil.DEFAULT_TIME_ZONE_ID; public static final boolean DEFAULT_CALL_QUEUE_ROUND_ROBIN = true; public static final int DEFAULT_MAX_MUTATION_SIZE = 500000; - public static final boolean DEFAULT_ROW_KEY_ORDER_SALTED_TABLE = true; // Merge sort on client to ensure salted tables are row key ordered public static final boolean DEFAULT_USE_INDEXES = true; // Use indexes public static final boolean DEFAULT_IMMUTABLE_ROWS = false; // Tables rows may be updated public static final boolean DEFAULT_DROP_METADATA = true; // Drop meta data also. @@ -191,6 +191,7 @@ public class QueryServicesOptions { private static final String DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY = ClientRpcControllerFactory.class.getName(); public static final boolean DEFAULT_USE_BYTE_BASED_REGEX = false; + public static final boolean DEFAULT_FORCE_ROW_KEY_ORDER = false; private final Configuration config; @@ -229,7 +230,7 @@ public class QueryServicesOptions { .setIfUnset(STATS_UPDATE_FREQ_MS_ATTRIB, DEFAULT_STATS_UPDATE_FREQ_MS) .setIfUnset(CALL_QUEUE_ROUND_ROBIN_ATTRIB, DEFAULT_CALL_QUEUE_ROUND_ROBIN) .setIfUnset(MAX_MUTATION_SIZE_ATTRIB, DEFAULT_MAX_MUTATION_SIZE) - .setIfUnset(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, DEFAULT_ROW_KEY_ORDER_SALTED_TABLE) + .setIfUnset(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER) .setIfUnset(USE_INDEXES_ATTRIB, DEFAULT_USE_INDEXES) .setIfUnset(IMMUTABLE_ROWS_ATTRIB, DEFAULT_IMMUTABLE_ROWS) .setIfUnset(INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD) @@ -246,6 +247,7 @@ public class QueryServicesOptions { .setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED) .setIfUnset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY) .setIfUnset(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX) + .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER) ; // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set @@ -352,10 +354,6 @@ public class QueryServicesOptions { return set(MUTATE_BATCH_SIZE_ATTRIB, mutateBatchSize); } - public QueryServicesOptions setRowKeyOrderSaltedTable(boolean rowKeyOrderSaltedTable) { - return set(ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, rowKeyOrderSaltedTable); - } - public QueryServicesOptions setDropMetaData(boolean dropMetadata) { return set(DROP_METADATA_ATTRIB, dropMetadata); } @@ -452,6 +450,10 @@ public class QueryServicesOptions { public boolean isUseByteBasedRegex() { return config.getBoolean(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX); } + + public int getScanCacheSize() { + return config.getInt(SCAN_CACHE_SIZE_ATTRIB, DEFAULT_SCAN_CACHE_SIZE); + } public QueryServicesOptions setMaxServerCacheTTLMs(int ttl) { return set(MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, ttl); @@ -531,4 +533,9 @@ public class QueryServicesOptions { config.setBoolean(USE_BYTE_BASED_REGEX_ATTRIB, flag); return this; } + + public QueryServicesOptions setForceRowKeyOrder(boolean forceRowKeyOrder) { + config.setBoolean(FORCE_ROW_KEY_ORDER_ATTRIB, forceRowKeyOrder); + return this; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index e414039..22208f1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -1696,7 +1696,7 @@ public class MetaDataClient { splits = getSplitKeys(connection.getQueryServices().getAllTableRegions(parent.getPhysicalName().getBytes())); } else { splits = SchemaUtil.processSplits(splits, pkColumns, saltBucketNum, connection.getQueryServices().getProps().getBoolean( - QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, QueryServicesOptions.DEFAULT_ROW_KEY_ORDER_SALTED_TABLE)); + QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, QueryServicesOptions.DEFAULT_FORCE_ROW_KEY_ORDER)); } MetaDataMutationResult result = connection.getQueryServices().createTable( tableMetaData, http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index 2268866..a1473ef 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.util; +import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.FWD_ROW_KEY_ORDER_BY; +import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.REV_ROW_KEY_ORDER_BY; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_ANNOTATIONS; import java.io.IOException; @@ -38,7 +40,9 @@ import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.filter.BooleanExpressionFilter; @@ -46,6 +50,8 @@ import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.KeyRange.Bound; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.RowKeySchema; @@ -658,4 +664,20 @@ public class ScanUtil { } return filterIterator; } + + public static boolean isRoundRobinPossible(OrderBy orderBy, StatementContext context) throws SQLException { + int fetchSize = context.getStatement().getFetchSize(); + /* + * Selecting underlying scanners in a round-robin fashion is possible if there is no ordering of rows needed, + * not even row key order. Also no point doing round robin of scanners if fetch size + * is 1. + */ + return fetchSize > 1 && !shouldRowsBeInRowKeyOrder(orderBy, context) && orderBy.getOrderByExpressions().isEmpty(); + } + + public static boolean shouldRowsBeInRowKeyOrder(OrderBy orderBy, StatementContext context) { + boolean forceRowKeyOrder = context.getConnection().getQueryServices().getProps() + .getBoolean(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, QueryServicesOptions.DEFAULT_FORCE_ROW_KEY_ORDER); + return forceRowKeyOrder || orderBy == FWD_ROW_KEY_ORDER_BY || orderBy == REV_ROW_KEY_ORDER_BY; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java index 77eb237..7be8eae 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java @@ -56,6 +56,7 @@ import org.apache.phoenix.jdbc.PhoenixPreparedStatement; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.BaseConnectionlessQueryTest; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.AmbiguousColumnException; import org.apache.phoenix.schema.ColumnAlreadyExistsException; import org.apache.phoenix.schema.ColumnNotFoundException; @@ -1732,5 +1733,64 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest { QueryPlan plan = conn.createStatement().unwrap(PhoenixStatement.class).compileQuery(query); assertFalse("Expected group by not to be order preserving: " + query, plan.getGroupBy().isOrderPreserving()); } - } + } + + @Test + public void testUseRoundRobinIterator() throws Exception { + Properties props = new Properties(); + props.setProperty(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false)); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE TABLE t (k1 char(2) not null, k2 varchar not null, k3 integer not null, v varchar, constraint pk primary key(k1,k2,k3))"); + String[] queries = { + "SELECT 1 FROM T ", + "SELECT 1 FROM T WHERE V = 'c'", + "SELECT 1 FROM T WHERE (k1,k2, k3) > ('a', 'ab', 1)", + }; + String query; + for (int i = 0; i < queries.length; i++) { + query = queries[i]; + QueryPlan plan = conn.createStatement().unwrap(PhoenixStatement.class).compileQuery(query); + assertTrue("Expected plan to use round robin iterator " + query, plan.useRoundRobinIterator()); + } + } + + @Test + public void testForcingRowKeyOrderNotUseRoundRobinIterator() throws Exception { + Properties props = new Properties(); + props.setProperty(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(true)); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE TABLE t (k1 char(2) not null, k2 varchar not null, k3 integer not null, v varchar, constraint pk primary key(k1,k2,k3))"); + String[] queries = { + "SELECT 1 FROM T ", + "SELECT 1 FROM T WHERE V = 'c'", + "SELECT 1 FROM T WHERE (k1, k2, k3) > ('a', 'ab', 1)", + }; + String query; + for (int i = 0; i < queries.length; i++) { + query = queries[i]; + QueryPlan plan = conn.createStatement().unwrap(PhoenixStatement.class).compileQuery(query); + assertFalse("Expected plan to not use round robin iterator " + query, plan.useRoundRobinIterator()); + } + } + + @Test + public void testPlanForOrderByOrGroupByNotUseRoundRobin() throws Exception { + Properties props = new Properties(); + props.setProperty(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false)); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE TABLE t (k1 char(2) not null, k2 varchar not null, k3 integer not null, v varchar, constraint pk primary key(k1,k2,k3))"); + String[] queries = { + "SELECT 1 FROM T ORDER BY K1", + "SELECT 1 FROM T WHERE V = 'c' ORDER BY K1, K2", + "SELECT 1 FROM T WHERE (k1,k2, k3) > ('a', 'ab', 1) ORDER BY V", + "SELECT 1 FROM T GROUP BY V", + "SELECT 1 FROM T GROUP BY K1, V, K2 ORDER BY V", + }; + String query; + for (int i = 0; i < queries.length; i++) { + query = queries[i]; + QueryPlan plan = conn.createStatement().unwrap(PhoenixStatement.class).compileQuery(query); + assertFalse("Expected plan to not use round robin iterator " + query, plan.useRoundRobinIterator()); + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java index 29e14bf..e645383 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java @@ -600,7 +600,7 @@ public class SkipScanBigFilterTest extends BaseConnectionlessQueryTest { public static void doSetup() throws Exception { Map<String,String> props = Maps.newHashMapWithExpectedSize(1); // enables manual splitting on salted tables - props.put(QueryServices.ROW_KEY_ORDER_SALTED_TABLE_ATTRIB, Boolean.toString(false)); + props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false)); initDriver(new ReadOnlyProps(props.entrySet().iterator())); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java index f929eb4..6f2a2f1 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java @@ -411,6 +411,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest { public List<List<Scan>> getScans() { return null; } + + @Override + public boolean useRoundRobinIterator() { + return false; + } }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices())); List<KeyRange> keyRanges = parallelIterators.getSplits(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb8d7664/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java index c1b7f99..9fee78f 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java @@ -59,6 +59,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { * value overwhelms our mini clusters. */ public static final int DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS = 4; + public static final boolean DEFAULT_FORCE_ROWKEY_ORDER = true; public QueryServicesTestImpl(ReadOnlyProps defaultProps) { @@ -79,7 +80,6 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { .setMaxMemoryWaitMs(DEFAULT_MAX_MEMORY_WAIT_MS) .setMaxTenantMemoryPerc(DEFAULT_MAX_TENANT_MEMORY_PERC) .setMaxServerCacheSize(DEFAULT_MAX_HASH_CACHE_SIZE) - .setRowKeyOrderSaltedTable(true) .setMaxServerCacheTTLMs(DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS) .setMasterInfoPort(DEFAULT_MASTER_INFO_PORT) .setRegionServerInfoPort(DEFAULT_REGIONSERVER_INFO_PORT) @@ -88,7 +88,8 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { .setWALEditCodec(DEFAULT_WAL_EDIT_CODEC) .setDropMetaData(DEFAULT_DROP_METADATA) .setMaxClientMetaDataCacheSize(DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE) - .setMaxServerMetaDataCacheSize(DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE); + .setMaxServerMetaDataCacheSize(DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE) + .setForceRowKeyOrder(DEFAULT_FORCE_ROWKEY_ORDER); } public QueryServicesTestImpl(ReadOnlyProps defaultProps, ReadOnlyProps overrideProps) {
