Repository: phoenix Updated Branches: refs/heads/calcite 71672620f -> 8c19e1c13
PHOENIX-2207 Load scanner caches in parallel when using stats and round robin iterator Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4b9e740e Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4b9e740e Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4b9e740e Branch: refs/heads/calcite Commit: 4b9e740efc5bfb3fb13ec2542bdfe2157b29f65b Parents: 6ada2ae Author: Samarth <[email protected]> Authored: Fri Sep 4 10:05:15 2015 -0700 Committer: Samarth <[email protected]> Committed: Fri Sep 4 10:05:15 2015 -0700 ---------------------------------------------------------------------- .../iterate/MockParallelIteratorFactory.java | 47 +++++++++ .../iterate/MockTableResultIterator.java | 66 ++++++++++++ .../iterate/RoundRobinResultIteratorIT.java | 40 ++++++- .../RoundRobinResultIteratorWithStatsIT.java | 104 +++++++++++++++++++ .../phoenix/iterate/BaseResultIterators.java | 14 ++- .../apache/phoenix/jdbc/PhoenixConnection.java | 16 +++ .../apache/phoenix/jdbc/PhoenixResultSet.java | 6 +- .../apache/phoenix/jdbc/PhoenixStatement.java | 3 +- 8 files changed, 290 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b9e740e/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java new file mode 100644 index 0000000..1161bae --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockParallelIteratorFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import java.sql.SQLException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.schema.PTable; + +/** + * Iterator factory that creates {@code MockTableResultIterator} + */ +public class MockParallelIteratorFactory implements ParallelIteratorFactory { + private static final AtomicInteger counter = new AtomicInteger(1); + private PTable table; + + @Override + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, + String physicalTableName) throws SQLException { + return new MockTableResultIterator(String.valueOf(counter.incrementAndGet()), table); + } + + public void setTable(PTable table) { + this.table = table; + } + +} + + + http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b9e740e/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockTableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockTableResultIterator.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockTableResultIterator.java new file mode 100644 index 0000000..f0da101 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/MockTableResultIterator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.tuple.ResultTuple; +import org.apache.phoenix.schema.tuple.Tuple; + +/** + * Mock result iterator that returns its id as a string in a {@code Tuple} when {@link #next()} and {@link #peek()} are called. + */ +public class MockTableResultIterator implements PeekingResultIterator { + + private final Tuple tuple; + + public MockTableResultIterator(String id, PTable table) { + TupleProjector projector = new TupleProjector(table); + List<Cell> result = new ArrayList<>(); + result.add(new KeyValue(Bytes.toBytes(id), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(id))); + this.tuple = projector.projectResults(new ResultTuple(Result.create(result))); + } + + @Override + public Tuple next() throws SQLException { + return tuple; + } + + @Override + public void explain(List<String> planSteps) {} + + @Override + public void close() throws SQLException {} + + @Override + public Tuple peek() throws SQLException { + return tuple; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b9e740e/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java index 224ed95..dd384cc 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java @@ -28,8 +28,10 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -39,12 +41,15 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; import org.apache.phoenix.end2end.Shadower; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixResultSet; +import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.junit.BeforeClass; @@ -395,5 +400,38 @@ public class RoundRobinResultIteratorIT extends BaseHBaseManagedTimeIT { assertEquals(numFetches, roundRobinItr.getNumberOfParallelFetches()); } } - + + @Test + public void testIteratorsPickedInRoundRobinFashionForSaltedTable() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String testTable = "testIteratorsPickedInRoundRobinFashionForSaltedTable".toUpperCase(); + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE " + testTable + "(K VARCHAR PRIMARY KEY) SALT_BUCKETS = 8"); + PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class); + MockParallelIteratorFactory parallelIteratorFactory = new MockParallelIteratorFactory(); + phxConn.setIteratorFactory(parallelIteratorFactory); + ResultSet rs = stmt.executeQuery("SELECT * FROM " + testTable); + StatementContext ctx = rs.unwrap(PhoenixResultSet.class).getContext(); + PTable table = ctx.getResolver().getTables().get(0).getTable(); + parallelIteratorFactory.setTable(table); + PhoenixStatement pstmt = stmt.unwrap(PhoenixStatement.class); + int numIterators = pstmt.getQueryPlan().getSplits().size(); + assertEquals(8, numIterators); + int numFetches = 2 * numIterators; + List<String> iteratorOrder = new ArrayList<>(numFetches); + for (int i = 1; i <= numFetches; i++) { + rs.next(); + iteratorOrder.add(rs.getString(1)); + } + /* + * Because TableResultIterators are created in parallel in multiple threads, their relative order is not + * deterministic. However, once the iterators are assigned to a RoundRobinResultIterator, the order in which + * the next iterator is picked is deterministic - i1, i2, .. i7, i8, i1, i2, .. i7, i8, i1, i2, .. + */ + for (int i = 0; i < numIterators; i++) { + assertEquals(iteratorOrder.get(i), iteratorOrder.get(i + numIterators)); + } + } + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b9e740e/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorWithStatsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorWithStatsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorWithStatsIT.java new file mode 100644 index 0000000..ddf17ba --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorWithStatsIT.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixResultSet; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class RoundRobinResultIteratorWithStatsIT extends BaseOwnClusterHBaseManagedTimeIT { + + @BeforeClass + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + // Must update config before starting server + props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(70000)); + props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(10000)); + props.put(QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB, Boolean.TRUE.toString()); + /* + * Don't force row key order. This causes RoundRobinResultIterator to be used if there was no order by specified + * on the query. + */ + props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.toString(false)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testRoundRobinBehavior() throws Exception { + int nRows = 30000; + try (Connection conn = DriverManager.getConnection(getUrl())) { + String testTable = "testRoundRobinBehavior".toUpperCase(); + conn.createStatement().execute("CREATE TABLE " + testTable + "(K VARCHAR PRIMARY KEY)"); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + testTable + " VALUES(?)"); + for (int i = 1; i <= nRows; i++) { + stmt.setString(1, i + ""); + stmt.executeUpdate(); + if ((i % 2000) == 0) { + conn.commit(); + } + } + conn.commit(); + conn.createStatement().execute("UPDATE STATISTICS " + testTable); + PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class); + MockParallelIteratorFactory parallelIteratorFactory = new MockParallelIteratorFactory(); + phxConn.setIteratorFactory(parallelIteratorFactory); + ResultSet rs = stmt.executeQuery("SELECT * FROM " + testTable); + StatementContext ctx = rs.unwrap(PhoenixResultSet.class).getContext(); + PTable table = ctx.getResolver().getTables().get(0).getTable(); + parallelIteratorFactory.setTable(table); + PhoenixStatement pstmt = stmt.unwrap(PhoenixStatement.class); + int numIterators = pstmt.getQueryPlan().getSplits().size(); + assertTrue(numIterators > 1); + int numFetches = 2 * numIterators; + List<String> iteratorOrder = new ArrayList<>(numFetches); + for (int i = 1; i <= numFetches; i++) { + rs.next(); + iteratorOrder.add(rs.getString(1)); + } + /* + * Because TableResultIterators are created in parallel in multiple threads, their relative order is not + * deterministic. However, once the iterators are assigned to a RoundRobinResultIterator, the order in which + * the next iterator is picked is deterministic - i1, i2, .. i7, i8, i1, i2, .. i7, i8, i1, i2, .. + */ + for (int i = 0; i < numIterators; i++) { + assertEquals(iteratorOrder.get(i), iteratorOrder.get(i + numIterators)); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b9e740e/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index a0aefaa..b5243cf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -666,9 +666,19 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } } - private void addIterator(List<PeekingResultIterator> parentIterators, List<PeekingResultIterator> childIterators) { + private void addIterator(List<PeekingResultIterator> parentIterators, List<PeekingResultIterator> childIterators) throws SQLException { if (!childIterators.isEmpty()) { - parentIterators.add(ConcatResultIterator.newIterator(childIterators)); + if (plan.useRoundRobinIterator()) { + /* + * When using a round robin iterator we shouldn't concatenate the iterators together. This is because a + * round robin iterator should be calling next() on these iterators directly after selecting them in a + * round robin fashion. This helps take advantage of loading the underlying scanners' caches in parallel + * as well as preventing errors arising out of scanner lease expirations. + */ + parentIterators.addAll(childIterators); + } else { + parentIterators.add(ConcatResultIterator.newIterator(childIterators)); + } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b9e740e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index 0cd76fe..90308c4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -64,6 +64,7 @@ import org.apache.phoenix.execute.CommitException; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.expression.function.FunctionArgumentType; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; +import org.apache.phoenix.iterate.ParallelIteratorFactory; import org.apache.phoenix.jdbc.PhoenixStatement.PhoenixStatementParser; import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.query.ConnectionQueryServices; @@ -141,6 +142,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd private Map<String, String> customTracingAnnotations = emptyMap(); private final boolean isRequestLevelMetricsEnabled; private final boolean isDescVarLengthRowKeyUpgrade; + private ParallelIteratorFactory parallelIteratorFactory; static { @@ -906,4 +908,18 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd public boolean isDescVarLengthRowKeyUpgrade() { return isDescVarLengthRowKeyUpgrade; } + + /** + * Added for tests only. Do not use this elsewhere. + */ + public ParallelIteratorFactory getIteratorFactory() { + return parallelIteratorFactory; + } + + /** + * Added for testing purposes. Do not use this elsewhere. + */ + public void setIteratorFactory(ParallelIteratorFactory parallelIteratorFactory) { + this.parallelIteratorFactory = parallelIteratorFactory; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b9e740e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java index da06370..60a6957 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java @@ -39,13 +39,11 @@ import java.sql.Time; import java.sql.Timestamp; import java.text.Format; import java.util.Calendar; -import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.ColumnProjector; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.StatementContext; @@ -1294,5 +1292,9 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho readMetricsQueue.clearMetrics(); overAllQueryMetrics.reset(); } + + public StatementContext getContext() { + return context; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4b9e740e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 056263a..98a2903 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -57,6 +57,7 @@ import org.apache.phoenix.compile.DropSequenceCompiler; import org.apache.phoenix.compile.ExplainPlan; import org.apache.phoenix.compile.ExpressionProjector; import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.SequenceManager; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.ListJarsQueryPlan; import org.apache.phoenix.compile.MutationPlan; @@ -377,7 +378,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho resolver = FromCompiler.getResolverForQuery(transformedSelect, stmt.getConnection()); select = StatementNormalizer.normalize(transformedSelect, resolver); } - QueryPlan plan = new QueryCompiler(stmt, select, resolver).compile(); + QueryPlan plan = new QueryCompiler(stmt, select, resolver, Collections.<PDatum>emptyList(), stmt.getConnection().getIteratorFactory(), new SequenceManager(stmt), true).compile(); plan.getContext().getSequenceManager().validateSequences(seqAction); return plan; }
