PHOENIX-1315 Optimize query for Pig loader
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5df8d1ec Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5df8d1ec Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5df8d1ec Branch: refs/heads/4.0 Commit: 5df8d1ec4a2d755365738f9e0e6e8310bf96d83e Parents: 8840af6 Author: James Taylor <jtay...@salesforce.com> Authored: Sun Oct 5 09:53:14 2014 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Sun Oct 5 09:53:14 2014 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/EvaluationOfORIT.java | 9 +-- .../apache/phoenix/end2end/ReverseScanIT.java | 4 +- ...ipRangeParallelIteratorRegionSplitterIT.java | 5 ++ .../index/balancer/IndexLoadBalancerIT.java | 13 +++-- .../org/apache/phoenix/compile/QueryPlan.java | 3 + .../apache/phoenix/execute/AggregatePlan.java | 6 ++ .../phoenix/execute/DegenerateQueryPlan.java | 12 +++- .../apache/phoenix/execute/HashJoinPlan.java | 5 ++ .../org/apache/phoenix/execute/ScanPlan.java | 8 +++ .../phoenix/iterate/ConcatResultIterator.java | 29 ++++++++++ .../iterate/LookAheadResultIterator.java | 21 +++++++ .../phoenix/iterate/ParallelIterators.java | 39 +++---------- .../apache/phoenix/jdbc/PhoenixStatement.java | 6 ++ .../phoenix/pig/PhoenixHBaseLoaderIT.java | 24 ++++---- .../phoenix/pig/hadoop/PhoenixInputFormat.java | 13 +++-- .../phoenix/pig/hadoop/PhoenixInputSplit.java | 60 +++++++++++++++----- .../phoenix/pig/hadoop/PhoenixRecordReader.java | 25 ++++---- 17 files changed, 196 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java index 052ff43..0e59542 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java @@ -28,21 +28,22 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Properties; +import org.apache.phoenix.util.PropertiesUtil; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category(BaseHBaseManagedTimeIT.class) +@Category(HBaseManagedTimeTest.class) public class EvaluationOfORIT extends BaseHBaseManagedTimeIT{ @Test public void testPKOrNotPKInOREvaluation() throws SQLException { - Properties props = new Properties(TEST_PROPERTIES); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props); conn.setAutoCommit(false); - String create = "CREATE TABLE DIE ( ID INTEGER NOT NULL PRIMARY KEY,NAME VARCHAR(50) NOT NULL)"; + String create = "CREATE TABLE DIE ( ID INTEGER NOT NULL PRIMARY KEY,NAME VARCHAR(50))"; PreparedStatement createStmt = conn.prepareStatement(create); - createStmt.executeUpdate(); + createStmt.execute(); PreparedStatement stmt = conn.prepareStatement( "upsert into " + "DIE VALUES (?, ?)"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java index f7409a9..f738773 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java @@ -47,8 +47,8 @@ import org.junit.experimental.categories.Category; import com.google.common.collect.Maps; -@Category(HBaseManagedTimeTest.class) -public class ReverseScanIT extends BaseClientManagedTimeIT { +@Category(ClientManagedTimeTest.class) +public class ReverseScanIT extends BaseHBaseManagedTimeIT { @BeforeClass @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class) public static void doSetup() throws Exception { http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java index 3d057ae..18d7910 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java @@ -432,6 +432,11 @@ public class SkipRangeParallelIteratorRegionSplitterIT extends BaseClientManaged public boolean isRowKeyOrdered() { return true; } + + @Override + public List<List<Scan>> getScans() { + return null; + } }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices())); List<KeyRange> keyRanges = parallelIterators.getSplits(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancerIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancerIT.java index b142a7f..d534b6a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancerIT.java @@ -84,12 +84,15 @@ public class IndexLoadBalancerIT { @AfterClass public static void tearDownAfterClass() throws Exception { - if (admin != null) { - admin.disableTables(".*"); - admin.deleteTables(".*"); - admin.close(); + try { + if (admin != null) { + admin.disableTables(".*"); + admin.deleteTables(".*"); + admin.close(); + } + } finally { + UTIL.shutdownMiniCluster(); } - UTIL.shutdownMiniCluster(); } @Test(timeout = 180000) http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java index 271ba30..a76993c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java @@ -20,6 +20,7 @@ package org.apache.phoenix.compile; import java.sql.SQLException; import java.util.List; +import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.iterate.ResultIterator; @@ -61,6 +62,8 @@ public interface QueryPlan extends StatementPlan { List<KeyRange> getSplits(); + List<List<Scan>> getScans(); + FilterableStatement getStatement(); public boolean isDegenerate(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 9f294a1..d7a90fb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -69,6 +69,7 @@ public class AggregatePlan extends BaseQueryPlan { private final Aggregators aggregators; private final Expression having; private List<KeyRange> splits; + private List<List<Scan>> scans; public AggregatePlan( StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, @@ -88,6 +89,11 @@ public class AggregatePlan extends BaseQueryPlan { return splits; } + @Override + public List<List<Scan>> getScans() { + return scans; + } + private static class OrderingResultIteratorFactory implements ParallelIteratorFactory { private final QueryServices services; http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java index 80c4727..70e59b9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java @@ -21,13 +21,16 @@ import java.sql.SQLException; import java.util.Collections; import java.util.List; +import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; -import org.apache.phoenix.compile.*; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixParameterMetaData; import org.apache.phoenix.parse.FilterableStatement; -import org.apache.phoenix.query.*; +import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.TableRef; public class DegenerateQueryPlan extends BaseQueryPlan { @@ -43,6 +46,11 @@ public class DegenerateQueryPlan extends BaseQueryPlan { } @Override + public List<List<Scan>> getScans() { + return Collections.emptyList(); + } + + @Override protected ResultIterator newIterator() throws SQLException { return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index 6e552d8..dcf162f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -472,6 +472,11 @@ public class HashJoinPlan implements QueryPlan { public boolean isRowKeyOrdered() { return plan.isRowKeyOrdered(); } + + @Override + public List<List<Scan>> getScans() { + return plan.getScans(); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index 4d2468c..12dc9ff 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -21,6 +21,7 @@ package org.apache.phoenix.execute; import java.sql.SQLException; import java.util.List; +import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.RowProjector; @@ -58,6 +59,7 @@ import org.apache.phoenix.schema.TableRef; */ public class ScanPlan extends BaseQueryPlan { private List<KeyRange> splits; + private List<List<Scan>> scans; private boolean allowPageFilter; public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) { @@ -96,6 +98,11 @@ public class ScanPlan extends BaseQueryPlan { } @Override + public List<List<Scan>> getScans() { + return scans; + } + + @Override protected ResultIterator newIterator() throws SQLException { // Set any scan attributes before creating the scanner, as it will be too late afterwards context.getScan().setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE); @@ -109,6 +116,7 @@ public class ScanPlan extends BaseQueryPlan { boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty(); ParallelIterators iterators = new ParallelIterators(this, !allowPageFilter || isOrdered ? null : limit, parallelIteratorFactory); splits = iterators.getSplits(); + scans = iterators.getScans(); if (isOrdered) { scanner = new MergeSortTopNResultIterator(iterators, limit, orderBy.getOrderByExpressions()); } else { http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java index f273fdf..cddf3b3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java @@ -91,4 +91,33 @@ public class ConcatResultIterator implements PeekingResultIterator { return "ConcatResultIterator [resultIterators=" + resultIterators + ", iterators=" + iterators + ", index=" + index + "]"; } + + public static PeekingResultIterator newConcatResultIterator(final List<PeekingResultIterator> concatIterators) { + if (concatIterators.isEmpty()) { + return PeekingResultIterator.EMPTY_ITERATOR; + } + + if (concatIterators.size() == 1) { + return concatIterators.get(0); + } + return new ConcatResultIterator(new ResultIterators() { + + @Override + public List<PeekingResultIterator> getIterators() throws SQLException { + return concatIterators; + } + + @Override + public int size() { + return concatIterators.size(); + } + + @Override + public void explain(List<String> planSteps) { + // TODO: review what we should for explain plan here + concatIterators.get(0).explain(planSteps); + } + + }); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java index d823ffd..a7f390f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java @@ -18,12 +18,33 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; +import java.util.List; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; abstract public class LookAheadResultIterator implements PeekingResultIterator { + public static LookAheadResultIterator wrap(final ResultIterator iterator) { + return new LookAheadResultIterator() { + + @Override + public void explain(List<String> planSteps) { + iterator.explain(planSteps); + } + + @Override + public void close() throws SQLException { + iterator.close(); + } + + @Override + protected Tuple advance() throws SQLException { + return iterator.next(); + } + }; + } + private final static Tuple UNINITIALIZED = new ResultTuple(); private Tuple next = UNINITIALIZED; http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index 7f11b79..66807b7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -271,6 +271,10 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { return splits; } + public List<List<Scan>> getScans() { + return scans; + } + private static List<byte[]> toBoundaries(List<HRegionLocation> regionLocations) { int nBoundaries = regionLocations.size() - 1; List<byte[]> ranges = Lists.newArrayListWithExpectedSize(nBoundaries); @@ -442,36 +446,6 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { return parallelScans; } - private static void addConcatResultIterator(List<PeekingResultIterator> iterators, final List<PeekingResultIterator> concatIterators) { - if (!concatIterators.isEmpty()) { - if (concatIterators.size() == 1) { - iterators.add(concatIterators.get(0)); - } else { - // TODO: should ConcatResultIterator have a constructor that takes - // a List<PeekingResultIterator>? - iterators.add(new ConcatResultIterator(new ResultIterators() { - - @Override - public List<PeekingResultIterator> getIterators() throws SQLException { - return concatIterators; - } - - @Override - public int size() { - return concatIterators.size(); - } - - @Override - public void explain(List<String> planSteps) { - // TODO: review what we should for explain plan here - concatIterators.get(0).explain(planSteps); - } - - })); - } - } - } - public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) { if (!reverse) { return list; @@ -479,6 +453,11 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { return Lists.reverse(list); } + private static void addConcatResultIterator(List<PeekingResultIterator> iterators, final List<PeekingResultIterator> concatIterators) { + if (!concatIterators.isEmpty()) { + iterators.add(ConcatResultIterator.newConcatResultIterator(concatIterators)); + } + } /** * Executes the scan in parallel across all regions, blocking until all scans are complete. * @return the result iterators for the scan of each region http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 4f67d4f..bdde415 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -32,6 +32,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.call.CallRunner; import org.apache.phoenix.compile.ColumnProjector; @@ -419,6 +420,11 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho } @Override + public List<List<Scan>> getScans() { + return Collections.emptyList(); + } + + @Override public StatementContext getContext() { return plan.getContext(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java index d82e6b0..6db97f5 100644 --- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java +++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java @@ -113,7 +113,7 @@ public class PhoenixHBaseLoaderIT { conn.createStatement().execute(ddl); pigServer.registerQuery(String.format( - "A = load 'hbase://table/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME, + "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE_FULL_NAME, zkQuorum)); final Schema schema = pigServer.dumpSchema("A"); @@ -144,7 +144,7 @@ public class PhoenixHBaseLoaderIT { final String selectColumns = "ID,NAME"; pigServer.registerQuery(String.format( - "A = load 'hbase://table/%s/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", + "A = load 'hbase://table/%s/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE_FULL_NAME, selectColumns, zkQuorum)); Schema schema = pigServer.dumpSchema("A"); @@ -175,7 +175,7 @@ public class PhoenixHBaseLoaderIT { //sql query for LOAD final String sqlQuery = "SELECT A_STRING,CF1.A_INTEGER,CF2.A_DOUBLE FROM " + TABLE_FULL_NAME; pigServer.registerQuery(String.format( - "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", + "A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery, zkQuorum)); //assert the schema. @@ -209,7 +209,7 @@ public class PhoenixHBaseLoaderIT { LOG.info(String.format("Generated SQL Query [%s]",sqlQuery)); pigServer.registerQuery(String.format( - "raw = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s') AS (a:chararray,b:bigdecimal,c:int,d:double);", + "raw = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s') AS (a:chararray,b:bigdecimal,c:int,d:double);", sqlQuery, zkQuorum)); //test the schema. @@ -252,7 +252,7 @@ public class PhoenixHBaseLoaderIT { //load data and filter rows whose age is > 25 pigServer.registerQuery(String.format( - "A = load 'hbase://table/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME, + "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE_FULL_NAME, zkQuorum)); pigServer.registerQuery("B = FILTER A BY AGE > 25;"); @@ -340,7 +340,7 @@ public class PhoenixHBaseLoaderIT { final String sqlQuery = String.format(" SELECT FOO, BAZ FROM %s WHERE BAR = -1 " , TABLE_FULL_NAME); pigServer.registerQuery(String.format( - "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery, + "A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery, zkQuorum)); final Iterator<Tuple> iterator = pigServer.openIterator("A"); @@ -404,7 +404,7 @@ public class PhoenixHBaseLoaderIT { //load data and filter rows whose age is > 25 pigServer.setBatchOn(); pigServer.registerQuery(String.format( - "A = load 'hbase://table/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME, + "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE_FULL_NAME, zkQuorum)); pigServer.registerQuery("B = GROUP A BY AGE;"); @@ -458,7 +458,7 @@ public class PhoenixHBaseLoaderIT { //load data and filter rows whose age is > 25 pigServer.setBatchOn(); pigServer.registerQuery(String.format( - "A = load 'hbase://table/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME, + "A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", TABLE_FULL_NAME, zkQuorum)); pigServer.registerQuery("B = GROUP A BY AGE;"); @@ -471,11 +471,11 @@ public class PhoenixHBaseLoaderIT { //validate the data with what is stored. final String selectQuery = "SELECT AGE , MIN_SAL ,MAX_SAL FROM " + targetTable + " ORDER BY AGE"; final ResultSet rs = conn.createStatement().executeQuery(selectQuery); - rs.next(); + assertTrue(rs.next()); assertEquals(25, rs.getInt("AGE")); assertEquals(0, rs.getInt("MIN_SAL")); assertEquals(180, rs.getInt("MAX_SAL")); - rs.next(); + assertTrue(rs.next()); assertEquals(30, rs.getInt("AGE")); assertEquals(0, rs.getInt("MIN_SAL")); assertEquals(270, rs.getInt("MAX_SAL")); @@ -513,7 +513,7 @@ public class PhoenixHBaseLoaderIT { //sql query load data and filter rows whose age is > 25 final String sqlQuery = " SELECT NEXT VALUE FOR my_sequence AS my_seq,ID,NAME,AGE FROM " + TABLE_FULL_NAME + " WHERE AGE > 25"; pigServer.registerQuery(String.format( - "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery, + "A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery, zkQuorum)); @@ -550,7 +550,7 @@ public class PhoenixHBaseLoaderIT { final String sqlQuery = " SELECT UPPER(NAME) AS n FROM " + TABLE_FULL_NAME + " ORDER BY ID" ; pigServer.registerQuery(String.format( - "A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery, + "A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery, zkQuorum)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java index d20ca6d..b58e7e3 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java @@ -28,6 +28,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -86,8 +87,8 @@ public final class PhoenixInputFormat extends InputFormat<NullWritable, PhoenixR Preconditions.checkNotNull(qplan); Preconditions.checkNotNull(splits); final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size()); - for (KeyRange split : qplan.getSplits()) { - psplits.add(new PhoenixInputSplit(split)); + for (List<Scan> scans : qplan.getScans()) { + psplits.add(new PhoenixInputSplit(scans)); } return psplits; } @@ -127,10 +128,10 @@ public final class PhoenixInputFormat extends InputFormat<NullWritable, PhoenixR Preconditions.checkNotNull(selectStatement); final Statement statement = connection.createStatement(); final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); - this.queryPlan = pstmt.compileQuery(selectStatement); - // FIXME: why is getting the iterator necessary here, as it will - // cause the query to run. - this.queryPlan.iterator(); + // Optimize the query plan so that we potentially use secondary indexes + this.queryPlan = pstmt.optimizeQuery(selectStatement); + // Initialize the query plan so it sets up the parallel scans + queryPlan.iterator(); } catch(Exception exception) { LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage())); throw new RuntimeException(exception); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java index 43d69b3..b1d015a 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java @@ -22,12 +22,18 @@ package org.apache.phoenix.pig.hadoop; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.phoenix.query.KeyRange; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; /** * @@ -36,6 +42,7 @@ import com.google.common.base.Preconditions; */ public class PhoenixInputSplit extends InputSplit implements Writable { + private List<Scan> scans; private KeyRange keyRange; /** @@ -48,21 +55,49 @@ public class PhoenixInputSplit extends InputSplit implements Writable { * * @param keyRange */ - public PhoenixInputSplit(final KeyRange keyRange) { - Preconditions.checkNotNull(keyRange); - this.keyRange = keyRange; + public PhoenixInputSplit(final List<Scan> scans) { + Preconditions.checkNotNull(scans); + Preconditions.checkState(!scans.isEmpty()); + this.scans = scans; + init(); + } + + public List<Scan> getScans() { + return scans; + } + + public KeyRange getKeyRange() { + return keyRange; + } + + private void init() { + this.keyRange = KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size()-1).getStopRow()); } @Override public void readFields(DataInput input) throws IOException { - this.keyRange = new KeyRange (); - this.keyRange.readFields(input); + int count = WritableUtils.readVInt(input); + scans = Lists.newArrayListWithExpectedSize(count); + for (int i = 0; i < count; i++) { + byte[] protoScanBytes = new byte[WritableUtils.readVInt(input)]; + input.readFully(protoScanBytes); + ClientProtos.Scan protoScan = ClientProtos.Scan.parseFrom(protoScanBytes); + Scan scan = ProtobufUtil.toScan(protoScan); + scans.add(scan); + } + init(); } @Override public void write(DataOutput output) throws IOException { - Preconditions.checkNotNull(keyRange); - keyRange.write(output); + Preconditions.checkNotNull(scans); + WritableUtils.writeVInt(output, scans.size()); + for (Scan scan : scans) { + ClientProtos.Scan protoScan = ProtobufUtil.toScan(scan); + byte[] protoScanBytes = protoScan.toByteArray(); + WritableUtils.writeVInt(output, protoScanBytes.length); + output.write(protoScanBytes); + } } @Override @@ -75,23 +110,18 @@ public class PhoenixInputSplit extends InputSplit implements Writable { return new String[]{}; } - /** - * @return Returns the keyRange. - */ - public KeyRange getKeyRange() { - return keyRange; - } - @Override public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((keyRange == null) ? 0 : keyRange.hashCode()); + result = prime * result + keyRange.hashCode(); return result; } @Override public boolean equals(Object obj) { + // TODO: review: it's a reasonable check to use the keyRange, + // but it's not perfect. Do we need an equals impl? if (this == obj) { return true; } if (obj == null) { return false; } if (!(obj instanceof PhoenixInputSplit)) { return false; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java index 5c6afb3..2bff620 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java @@ -21,6 +21,7 @@ package org.apache.phoenix.pig.hadoop; import java.io.IOException; import java.sql.SQLException; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,16 +31,18 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.iterate.ConcatResultIterator; +import org.apache.phoenix.iterate.LookAheadResultIterator; +import org.apache.phoenix.iterate.PeekingResultIterator; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.iterate.SequenceResultIterator; import org.apache.phoenix.iterate.TableResultIterator; import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.pig.PhoenixPigConfiguration; -import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.util.ScanUtil; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.Lists; /** * RecordReader that process the scan and returns PhoenixRecord @@ -94,17 +97,19 @@ public final class PhoenixRecordReader extends RecordReader<NullWritable,Phoenix @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { final PhoenixInputSplit pSplit = (PhoenixInputSplit)split; - final KeyRange keyRange = pSplit.getKeyRange(); - final Scan splitScan = queryPlan.getContext().getScan(); - final Scan scan = new Scan(splitScan); - ScanUtil.intersectScanRange(scan, keyRange.getLowerRange(), keyRange.getUpperRange(), queryPlan.getContext().getScanRanges().useSkipScanFilter()); + final List<Scan> scans = pSplit.getScans(); try { - TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan); + List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size()); + for (Scan scan : scans) { + final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan); + PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator); + iterators.add(peekingResultIterator); + } + ResultIterator iterator = ConcatResultIterator.newConcatResultIterator(iterators); if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) { - this.resultIterator = new SequenceResultIterator(tableResultIterator, queryPlan.getContext().getSequenceManager()); - } else { - this.resultIterator = tableResultIterator; + iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager()); } + this.resultIterator = iterator; this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector(),queryPlan.getContext().getStatement()); } catch (SQLException e) { LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage()));