PHOENIX-1315 Optimize query for Pig loader

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5df8d1ec
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5df8d1ec
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5df8d1ec

Branch: refs/heads/4.0
Commit: 5df8d1ec4a2d755365738f9e0e6e8310bf96d83e
Parents: 8840af6
Author: James Taylor <jtay...@salesforce.com>
Authored: Sun Oct 5 09:53:14 2014 -0700
Committer: James Taylor <jtay...@salesforce.com>
Committed: Sun Oct 5 09:53:14 2014 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/EvaluationOfORIT.java       |  9 +--
 .../apache/phoenix/end2end/ReverseScanIT.java   |  4 +-
 ...ipRangeParallelIteratorRegionSplitterIT.java |  5 ++
 .../index/balancer/IndexLoadBalancerIT.java     | 13 +++--
 .../org/apache/phoenix/compile/QueryPlan.java   |  3 +
 .../apache/phoenix/execute/AggregatePlan.java   |  6 ++
 .../phoenix/execute/DegenerateQueryPlan.java    | 12 +++-
 .../apache/phoenix/execute/HashJoinPlan.java    |  5 ++
 .../org/apache/phoenix/execute/ScanPlan.java    |  8 +++
 .../phoenix/iterate/ConcatResultIterator.java   | 29 ++++++++++
 .../iterate/LookAheadResultIterator.java        | 21 +++++++
 .../phoenix/iterate/ParallelIterators.java      | 39 +++----------
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  6 ++
 .../phoenix/pig/PhoenixHBaseLoaderIT.java       | 24 ++++----
 .../phoenix/pig/hadoop/PhoenixInputFormat.java  | 13 +++--
 .../phoenix/pig/hadoop/PhoenixInputSplit.java   | 60 +++++++++++++++-----
 .../phoenix/pig/hadoop/PhoenixRecordReader.java | 25 ++++----
 17 files changed, 196 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java
index 052ff43..0e59542 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/EvaluationOfORIT.java
@@ -28,21 +28,22 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Properties;
 
+import org.apache.phoenix.util.PropertiesUtil;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(BaseHBaseManagedTimeIT.class)
+@Category(HBaseManagedTimeTest.class)
 public class EvaluationOfORIT extends BaseHBaseManagedTimeIT{
                
        @Test
        public void testPKOrNotPKInOREvaluation() throws SQLException {
-           Properties props = new Properties(TEST_PROPERTIES);
+           Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
            Connection conn = DriverManager.getConnection(getUrl(), props);     
    
            conn.setAutoCommit(false);
            
-            String create = "CREATE TABLE DIE ( ID INTEGER NOT NULL PRIMARY 
KEY,NAME VARCHAR(50) NOT NULL)";
+            String create = "CREATE TABLE DIE ( ID INTEGER NOT NULL PRIMARY 
KEY,NAME VARCHAR(50))";
             PreparedStatement createStmt = conn.prepareStatement(create);
-            createStmt.executeUpdate();
+            createStmt.execute();
             PreparedStatement stmt = conn.prepareStatement(
                     "upsert into " +
                     "DIE VALUES (?, ?)");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
index f7409a9..f738773 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
@@ -47,8 +47,8 @@ import org.junit.experimental.categories.Category;
 
 import com.google.common.collect.Maps;
 
-@Category(HBaseManagedTimeTest.class)
-public class ReverseScanIT extends BaseClientManagedTimeIT {
+@Category(ClientManagedTimeTest.class)
+public class ReverseScanIT extends BaseHBaseManagedTimeIT {
     @BeforeClass
     @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
     public static void doSetup() throws Exception {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
index 3d057ae..18d7910 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
@@ -432,6 +432,11 @@ public class SkipRangeParallelIteratorRegionSplitterIT 
extends BaseClientManaged
             public boolean isRowKeyOrdered() {
                 return true;
             }
+
+            @Override
+            public List<List<Scan>> getScans() {
+                return null;
+            }
             
         }, null, new 
SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()));
         List<KeyRange> keyRanges = parallelIterators.getSplits();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancerIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancerIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancerIT.java
index b142a7f..d534b6a 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancerIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/balancer/IndexLoadBalancerIT.java
@@ -84,12 +84,15 @@ public class IndexLoadBalancerIT {
 
     @AfterClass
     public static void tearDownAfterClass() throws Exception {
-        if (admin != null) {
-            admin.disableTables(".*");
-            admin.deleteTables(".*");
-            admin.close();
+        try {
+            if (admin != null) {
+                admin.disableTables(".*");
+                admin.deleteTables(".*");
+                admin.close();
+            }
+        } finally {
+            UTIL.shutdownMiniCluster();
         }
-        UTIL.shutdownMiniCluster();
     }
 
     @Test(timeout = 180000)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index 271ba30..a76993c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.compile;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.iterate.ResultIterator;
@@ -61,6 +62,8 @@ public interface QueryPlan extends StatementPlan {
 
     List<KeyRange> getSplits();
 
+    List<List<Scan>> getScans();
+
     FilterableStatement getStatement();
 
     public boolean isDegenerate();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 9f294a1..d7a90fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -69,6 +69,7 @@ public class AggregatePlan extends BaseQueryPlan {
     private final Aggregators aggregators;
     private final Expression having;
     private List<KeyRange> splits;
+    private List<List<Scan>> scans;
 
     public AggregatePlan(
             StatementContext context, FilterableStatement statement, TableRef 
table, RowProjector projector,
@@ -88,6 +89,11 @@ public class AggregatePlan extends BaseQueryPlan {
         return splits;
     }
 
+    @Override
+    public List<List<Scan>> getScans() {
+        return scans;
+    }
+
     private static class OrderingResultIteratorFactory implements 
ParallelIteratorFactory {
         private final QueryServices services;
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
index 80c4727..70e59b9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@ -21,13 +21,16 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
-import org.apache.phoenix.compile.*;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 import org.apache.phoenix.parse.FilterableStatement;
-import org.apache.phoenix.query.*;
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.TableRef;
 
 public class DegenerateQueryPlan extends BaseQueryPlan {
@@ -43,6 +46,11 @@ public class DegenerateQueryPlan extends BaseQueryPlan {
     }
 
     @Override
+    public List<List<Scan>> getScans() {
+        return Collections.emptyList();
+    }
+
+    @Override
     protected ResultIterator newIterator() throws SQLException {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 6e552d8..dcf162f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -472,6 +472,11 @@ public class HashJoinPlan implements QueryPlan {
     public boolean isRowKeyOrdered() {
         return plan.isRowKeyOrdered();
     }
+
+    @Override
+    public List<List<Scan>> getScans() {
+        return plan.getScans();
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 4d2468c..12dc9ff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -21,6 +21,7 @@ package org.apache.phoenix.execute;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.RowProjector;
@@ -58,6 +59,7 @@ import org.apache.phoenix.schema.TableRef;
  */
 public class ScanPlan extends BaseQueryPlan {
     private List<KeyRange> splits;
+    private List<List<Scan>> scans;
     private boolean allowPageFilter;
 
     public ScanPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, 
ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) {
@@ -96,6 +98,11 @@ public class ScanPlan extends BaseQueryPlan {
     }
 
     @Override
+    public List<List<Scan>> getScans() {
+        return scans;
+    }
+
+    @Override
     protected ResultIterator newIterator() throws SQLException {
         // Set any scan attributes before creating the scanner, as it will be 
too late afterwards
         
context.getScan().setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, 
QueryConstants.TRUE);
@@ -109,6 +116,7 @@ public class ScanPlan extends BaseQueryPlan {
         boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
         ParallelIterators iterators = new ParallelIterators(this, 
!allowPageFilter || isOrdered ? null : limit, parallelIteratorFactory);
         splits = iterators.getSplits();
+        scans = iterators.getScans();
         if (isOrdered) {
             scanner = new MergeSortTopNResultIterator(iterators, limit, 
orderBy.getOrderByExpressions());
         } else {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
index f273fdf..cddf3b3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java
@@ -91,4 +91,33 @@ public class ConcatResultIterator implements 
PeekingResultIterator {
                return "ConcatResultIterator [resultIterators=" + 
resultIterators
                                + ", iterators=" + iterators + ", index=" + 
index + "]";
        }
+
+    public static PeekingResultIterator newConcatResultIterator(final 
List<PeekingResultIterator> concatIterators) {
+        if (concatIterators.isEmpty()) {
+            return PeekingResultIterator.EMPTY_ITERATOR;
+        } 
+        
+        if (concatIterators.size() == 1) {
+            return concatIterators.get(0);
+        }
+        return new ConcatResultIterator(new ResultIterators() {
+
+            @Override
+            public List<PeekingResultIterator> getIterators() throws 
SQLException {
+                return concatIterators;
+            }
+
+            @Override
+            public int size() {
+                return concatIterators.size();
+            }
+
+            @Override
+            public void explain(List<String> planSteps) {
+                // TODO: review what we should for explain plan here
+                concatIterators.get(0).explain(planSteps);
+            }
+            
+        });
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
index d823ffd..a7f390f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
@@ -18,12 +18,33 @@
 package org.apache.phoenix.iterate;
 
 import java.sql.SQLException;
+import java.util.List;
 
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 
 
 abstract public class LookAheadResultIterator implements PeekingResultIterator 
{
+    public static LookAheadResultIterator wrap(final ResultIterator iterator) {
+        return new LookAheadResultIterator() {
+
+            @Override
+            public void explain(List<String> planSteps) {
+                iterator.explain(planSteps);
+            }
+
+            @Override
+            public void close() throws SQLException {
+                iterator.close();
+            }
+
+            @Override
+            protected Tuple advance() throws SQLException {
+                return iterator.next();
+            }
+        };
+    }
+    
     private final static Tuple UNINITIALIZED = new ResultTuple();
     private Tuple next = UNINITIALIZED;
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index 7f11b79..66807b7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -271,6 +271,10 @@ public class ParallelIterators extends ExplainTable 
implements ResultIterators {
         return splits;
     }
 
+    public List<List<Scan>> getScans() {
+        return scans;
+    }
+
     private static List<byte[]> toBoundaries(List<HRegionLocation> 
regionLocations) {
         int nBoundaries = regionLocations.size() - 1;
         List<byte[]> ranges = Lists.newArrayListWithExpectedSize(nBoundaries);
@@ -442,36 +446,6 @@ public class ParallelIterators extends ExplainTable 
implements ResultIterators {
         return parallelScans;
     }
 
-    private static void addConcatResultIterator(List<PeekingResultIterator> 
iterators, final List<PeekingResultIterator> concatIterators) {
-        if (!concatIterators.isEmpty()) {
-            if (concatIterators.size() == 1) {
-                iterators.add(concatIterators.get(0));
-            } else {
-                // TODO: should ConcatResultIterator have a constructor that 
takes
-                // a List<PeekingResultIterator>?
-                iterators.add(new ConcatResultIterator(new ResultIterators() {
-    
-                    @Override
-                    public List<PeekingResultIterator> getIterators() throws 
SQLException {
-                        return concatIterators;
-                    }
-    
-                    @Override
-                    public int size() {
-                        return concatIterators.size();
-                    }
-    
-                    @Override
-                    public void explain(List<String> planSteps) {
-                        // TODO: review what we should for explain plan here
-                        concatIterators.get(0).explain(planSteps);
-                    }
-                    
-                }));
-            }
-        }
-    }
-    
     public static <T> List<T> reverseIfNecessary(List<T> list, boolean 
reverse) {
         if (!reverse) {
             return list;
@@ -479,6 +453,11 @@ public class ParallelIterators extends ExplainTable 
implements ResultIterators {
         return Lists.reverse(list);
     }
     
+    private static void addConcatResultIterator(List<PeekingResultIterator> 
iterators, final List<PeekingResultIterator> concatIterators) {
+        if (!concatIterators.isEmpty()) {
+            
iterators.add(ConcatResultIterator.newConcatResultIterator(concatIterators));
+        }
+    }
     /**
      * Executes the scan in parallel across all regions, blocking until all 
scans are complete.
      * @return the result iterators for the scan of each region

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 4f67d4f..bdde415 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -32,6 +32,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.call.CallRunner;
 import org.apache.phoenix.compile.ColumnProjector;
@@ -419,6 +420,11 @@ public class PhoenixStatement implements Statement, 
SQLCloseable, org.apache.pho
                 }
 
                 @Override
+                public List<List<Scan>> getScans() {
+                    return Collections.emptyList();
+                }
+
+                @Override
                 public StatementContext getContext() {
                     return plan.getContext();
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java 
b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
index d82e6b0..6db97f5 100644
--- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
+++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseLoaderIT.java
@@ -113,7 +113,7 @@ public class PhoenixHBaseLoaderIT {
         conn.createStatement().execute(ddl);
 
         pigServer.registerQuery(String.format(
-                "A = load 'hbase://table/%s' using 
org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME,
+                "A = load 'hbase://table/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');", TABLE_FULL_NAME,
                 zkQuorum));
         
         final Schema schema = pigServer.dumpSchema("A");
@@ -144,7 +144,7 @@ public class PhoenixHBaseLoaderIT {
         
         final String selectColumns = "ID,NAME";
         pigServer.registerQuery(String.format(
-                "A = load 'hbase://table/%s/%s' using 
org.apache.phoenix.pig.PhoenixHBaseLoader('%s');",
+                "A = load 'hbase://table/%s/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');",
                 TABLE_FULL_NAME, selectColumns, zkQuorum));
         
         Schema schema = pigServer.dumpSchema("A");
@@ -175,7 +175,7 @@ public class PhoenixHBaseLoaderIT {
         //sql query for LOAD
         final String sqlQuery = "SELECT A_STRING,CF1.A_INTEGER,CF2.A_DOUBLE 
FROM " + TABLE_FULL_NAME;
         pigServer.registerQuery(String.format(
-                "A = load 'hbase://query/%s' using 
org.apache.phoenix.pig.PhoenixHBaseLoader('%s');",
+                "A = load 'hbase://query/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');",
                 sqlQuery, zkQuorum));
         
         //assert the schema.
@@ -209,7 +209,7 @@ public class PhoenixHBaseLoaderIT {
         LOG.info(String.format("Generated SQL Query [%s]",sqlQuery));
         
         pigServer.registerQuery(String.format(
-                "raw = load 'hbase://query/%s' using 
org.apache.phoenix.pig.PhoenixHBaseLoader('%s') AS 
(a:chararray,b:bigdecimal,c:int,d:double);",
+                "raw = load 'hbase://query/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s') AS 
(a:chararray,b:bigdecimal,c:int,d:double);",
                 sqlQuery, zkQuorum));
         
         //test the schema.
@@ -252,7 +252,7 @@ public class PhoenixHBaseLoaderIT {
          
         //load data and filter rows whose age is > 25
         pigServer.registerQuery(String.format(
-                "A = load 'hbase://table/%s' using 
org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME,
+                "A = load 'hbase://table/%s' using "  + 
PhoenixHBaseLoader.class.getName() + "('%s');", TABLE_FULL_NAME,
                 zkQuorum));
         pigServer.registerQuery("B = FILTER A BY AGE > 25;");
         
@@ -340,7 +340,7 @@ public class PhoenixHBaseLoaderIT {
         final String sqlQuery = String.format(" SELECT FOO, BAZ FROM %s WHERE 
BAR = -1 " , TABLE_FULL_NAME);
       
         pigServer.registerQuery(String.format(
-                "A = load 'hbase://query/%s' using 
org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
+                "A = load 'hbase://query/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
                 zkQuorum));
         
         final Iterator<Tuple> iterator = pigServer.openIterator("A");
@@ -404,7 +404,7 @@ public class PhoenixHBaseLoaderIT {
          //load data and filter rows whose age is > 25
         pigServer.setBatchOn();
         pigServer.registerQuery(String.format(
-                "A = load 'hbase://table/%s' using 
org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME,
+                "A = load 'hbase://table/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');", TABLE_FULL_NAME,
                 zkQuorum));
         
         pigServer.registerQuery("B = GROUP A BY AGE;");
@@ -458,7 +458,7 @@ public class PhoenixHBaseLoaderIT {
          //load data and filter rows whose age is > 25
         pigServer.setBatchOn();
         pigServer.registerQuery(String.format(
-                "A = load 'hbase://table/%s' using 
org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", TABLE_FULL_NAME,
+                "A = load 'hbase://table/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');", TABLE_FULL_NAME,
                 zkQuorum));
         
         pigServer.registerQuery("B = GROUP A BY AGE;");
@@ -471,11 +471,11 @@ public class PhoenixHBaseLoaderIT {
         //validate the data with what is stored.
         final String selectQuery = "SELECT AGE , MIN_SAL ,MAX_SAL FROM " + 
targetTable + " ORDER BY AGE";
         final ResultSet rs = conn.createStatement().executeQuery(selectQuery);
-        rs.next();
+        assertTrue(rs.next());
         assertEquals(25, rs.getInt("AGE"));
         assertEquals(0, rs.getInt("MIN_SAL"));
         assertEquals(180, rs.getInt("MAX_SAL"));
-        rs.next();
+        assertTrue(rs.next());
         assertEquals(30, rs.getInt("AGE"));
         assertEquals(0, rs.getInt("MIN_SAL"));
         assertEquals(270, rs.getInt("MAX_SAL"));
@@ -513,7 +513,7 @@ public class PhoenixHBaseLoaderIT {
         //sql query load data and filter rows whose age is > 25
         final String sqlQuery = " SELECT NEXT VALUE FOR my_sequence AS 
my_seq,ID,NAME,AGE FROM " + TABLE_FULL_NAME + " WHERE AGE > 25";
         pigServer.registerQuery(String.format(
-                "A = load 'hbase://query/%s' using 
org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
+                "A = load 'hbase://query/%s' using " + 
PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
                 zkQuorum));
         
         
@@ -550,7 +550,7 @@ public class PhoenixHBaseLoaderIT {
         final String sqlQuery = " SELECT UPPER(NAME) AS n FROM " + 
TABLE_FULL_NAME + " ORDER BY ID" ;
 
         pigServer.registerQuery(String.format(
-                "A = load 'hbase://query/%s' using 
org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery,
+                "A = load 'hbase://query/%s' using "  + 
PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery,
                 zkQuorum));
         
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
 
b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
index d20ca6d..b58e7e3 100644
--- 
a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
+++ 
b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
@@ -28,6 +28,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -86,8 +87,8 @@ public final class PhoenixInputFormat extends 
InputFormat<NullWritable, PhoenixR
         Preconditions.checkNotNull(qplan);
         Preconditions.checkNotNull(splits);
         final List<InputSplit> psplits = 
Lists.newArrayListWithExpectedSize(splits.size());
-        for (KeyRange split : qplan.getSplits()) {
-            psplits.add(new PhoenixInputSplit(split));
+        for (List<Scan> scans : qplan.getScans()) {
+            psplits.add(new PhoenixInputSplit(scans));
         }
         return psplits;
     }
@@ -127,10 +128,10 @@ public final class PhoenixInputFormat extends 
InputFormat<NullWritable, PhoenixR
                 Preconditions.checkNotNull(selectStatement);
                 final Statement statement = connection.createStatement();
                 final PhoenixStatement pstmt = 
statement.unwrap(PhoenixStatement.class);
-                this.queryPlan = pstmt.compileQuery(selectStatement);
-                // FIXME: why is getting the iterator necessary here, as it 
will
-                // cause the query to run.
-                this.queryPlan.iterator();
+                // Optimize the query plan so that we potentially use 
secondary indexes
+                this.queryPlan = pstmt.optimizeQuery(selectStatement);
+                // Initialize the query plan so it sets up the parallel scans
+                queryPlan.iterator();
             } catch(Exception exception) {
                 LOG.error(String.format("Failed to get the query plan with 
error [%s]",exception.getMessage()));
                 throw new RuntimeException(exception);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
 
b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
index 43d69b3..b1d015a 100644
--- 
a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
+++ 
b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
@@ -22,12 +22,18 @@ package org.apache.phoenix.pig.hadoop;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.phoenix.query.KeyRange;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 /**
  * 
@@ -36,6 +42,7 @@ import com.google.common.base.Preconditions;
  */
 public class PhoenixInputSplit extends InputSplit implements Writable {
 
+    private List<Scan> scans;
     private KeyRange keyRange;
    
     /**
@@ -48,21 +55,49 @@ public class PhoenixInputSplit extends InputSplit 
implements Writable {
     * 
     * @param keyRange
     */
-    public PhoenixInputSplit(final KeyRange keyRange) {
-        Preconditions.checkNotNull(keyRange);
-        this.keyRange = keyRange;
+    public PhoenixInputSplit(final List<Scan> scans) {
+        Preconditions.checkNotNull(scans);
+        Preconditions.checkState(!scans.isEmpty());
+        this.scans = scans;
+        init();
+    }
+    
+    public List<Scan> getScans() {
+        return scans;
+    }
+    
+    public KeyRange getKeyRange() {
+        return keyRange;
+    }
+    
+    private void init() {
+        this.keyRange = KeyRange.getKeyRange(scans.get(0).getStartRow(), 
scans.get(scans.size()-1).getStopRow());
     }
     
     @Override
     public void readFields(DataInput input) throws IOException {
-        this.keyRange = new KeyRange ();
-        this.keyRange.readFields(input);
+        int count = WritableUtils.readVInt(input);
+        scans = Lists.newArrayListWithExpectedSize(count);
+        for (int i = 0; i < count; i++) {
+            byte[] protoScanBytes = new byte[WritableUtils.readVInt(input)];
+            input.readFully(protoScanBytes);
+            ClientProtos.Scan protoScan = 
ClientProtos.Scan.parseFrom(protoScanBytes);
+            Scan scan = ProtobufUtil.toScan(protoScan);
+            scans.add(scan);
+        }
+        init();
     }
     
     @Override
     public void write(DataOutput output) throws IOException {
-        Preconditions.checkNotNull(keyRange);
-        keyRange.write(output);
+        Preconditions.checkNotNull(scans);
+        WritableUtils.writeVInt(output, scans.size());
+        for (Scan scan : scans) {
+            ClientProtos.Scan protoScan = ProtobufUtil.toScan(scan);
+            byte[] protoScanBytes = protoScan.toByteArray();
+            WritableUtils.writeVInt(output, protoScanBytes.length);
+            output.write(protoScanBytes);
+        }
     }
 
     @Override
@@ -75,23 +110,18 @@ public class PhoenixInputSplit extends InputSplit 
implements Writable {
         return new String[]{};
     }
 
-    /**
-     * @return Returns the keyRange.
-     */
-    public KeyRange getKeyRange() {
-        return keyRange;
-    }
-
     @Override
     public int hashCode() {
         final int prime = 31;
         int result = 1;
-        result = prime * result + ((keyRange == null) ? 0 : 
keyRange.hashCode());
+        result = prime * result + keyRange.hashCode();
         return result;
     }
 
     @Override
     public boolean equals(Object obj) {
+        // TODO: review: it's a reasonable check to use the keyRange,
+        // but it's not perfect. Do we need an equals impl?
         if (this == obj) { return true; }
         if (obj == null) { return false; }
         if (!(obj instanceof PhoenixInputSplit)) { return false; }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5df8d1ec/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
 
b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
index 5c6afb3..2bff620 100644
--- 
a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
+++ 
b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.phoenix.pig.hadoop;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,16 +31,18 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.iterate.ConcatResultIterator;
+import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.PeekingResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.TableResultIterator;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.pig.PhoenixPigConfiguration;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.util.ScanUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
 
 /**
  * RecordReader that process the scan and returns PhoenixRecord
@@ -94,17 +97,19 @@ public final class PhoenixRecordReader extends 
RecordReader<NullWritable,Phoenix
     @Override
     public void initialize(InputSplit split, TaskAttemptContext context) 
throws IOException, InterruptedException {
         final PhoenixInputSplit pSplit = (PhoenixInputSplit)split;
-        final KeyRange keyRange = pSplit.getKeyRange();
-        final Scan splitScan = queryPlan.getContext().getScan();
-        final Scan scan = new Scan(splitScan);
-        ScanUtil.intersectScanRange(scan, keyRange.getLowerRange(), 
keyRange.getUpperRange(), 
queryPlan.getContext().getScanRanges().useSkipScanFilter());
+        final List<Scan> scans = pSplit.getScans();
         try {
-             TableResultIterator tableResultIterator = new 
TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan);
+            List<PeekingResultIterator> iterators = 
Lists.newArrayListWithExpectedSize(scans.size());
+            for (Scan scan : scans) {
+                final TableResultIterator tableResultIterator = new 
TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan);
+                PeekingResultIterator peekingResultIterator = 
LookAheadResultIterator.wrap(tableResultIterator);
+                iterators.add(peekingResultIterator);
+            }
+            ResultIterator iterator = 
ConcatResultIterator.newConcatResultIterator(iterators);
             if(queryPlan.getContext().getSequenceManager().getSequenceCount() 
> 0) {
-                    this.resultIterator = new 
SequenceResultIterator(tableResultIterator, 
queryPlan.getContext().getSequenceManager());
-            } else {
-                this.resultIterator = tableResultIterator;
+                iterator = new SequenceResultIterator(iterator, 
queryPlan.getContext().getSequenceManager());
             }
+            this.resultIterator = iterator;
             this.resultSet = new PhoenixResultSet(this.resultIterator, 
queryPlan.getProjector(),queryPlan.getContext().getStatement());
         } catch (SQLException e) {
             LOG.error(String.format(" Error [%s] initializing 
PhoenixRecordReader. ",e.getMessage()));

Reply via email to