Repository: phoenix
Updated Branches:
  refs/heads/master 13aa61a86 -> 6f16a6a62


PHOENIX-1570 Data missing when using local index (Maryann Xue, James Taylor)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6f16a6a6
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6f16a6a6
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6f16a6a6

Branch: refs/heads/master
Commit: 6f16a6a629fdeca587f38fe6e1349a2b6f0fabd8
Parents: 13aa61a
Author: James Taylor <[email protected]>
Authored: Mon Jan 12 12:10:14 2015 -0800
Committer: James Taylor <[email protected]>
Committed: Mon Jan 12 12:10:41 2015 -0800

----------------------------------------------------------------------
 .../phoenix/end2end/index/LocalIndexIT.java     | 12 ++--
 .../apache/phoenix/compile/GroupByCompiler.java |  7 +--
 .../apache/phoenix/execute/AggregatePlan.java   | 59 +++++++++++---------
 .../expression/ProjectedColumnExpression.java   | 41 +++++++-------
 .../phoenix/compile/QueryCompilerTest.java      | 18 +++---
 5 files changed, 69 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/6f16a6a6/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index a9e7304..2a9a82d 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -59,7 +59,7 @@ import org.apache.phoenix.end2end.Shadower;
 import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
@@ -480,10 +480,6 @@ public class LocalIndexIT extends BaseHBaseManagedTimeIT {
             assertEquals("z", rs.getString("V1"));
             
             query = "SELECT v1,sum(k3) from " + 
TestUtil.DEFAULT_DATA_TABLE_FULL_NAME + " where v1 <='z'  group by v1 order by 
v1";
-            PhoenixPreparedStatement statement = 
conn1.prepareStatement(query).unwrap(PhoenixPreparedStatement.class);
-            QueryPlan plan = statement.compileQuery("EXPLAIN " + query);
-            assertTrue(query, 
plan.getContext().getScan().getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS)
 == null);
-            assertTrue(query, 
plan.getContext().getScan().getAttribute(BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS)
 != null);
             
             rs = conn1.createStatement().executeQuery("EXPLAIN " + query);
             assertEquals(
@@ -492,7 +488,11 @@ public class LocalIndexIT extends BaseHBaseManagedTimeIT {
                         + "    SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY 
[V1]\nCLIENT MERGE SORT",
                 QueryUtil.getExplainPlan(rs));
             
-            rs = conn1.createStatement().executeQuery(query);
+            PhoenixStatement stmt = 
conn1.createStatement().unwrap(PhoenixStatement.class);
+            rs = stmt.executeQuery(query);
+            QueryPlan plan = stmt.getQueryPlan();
+            assertEquals(TestUtil.DEFAULT_INDEX_TABLE_NAME, 
plan.getContext().getCurrentTable().getTable().getName().getString());
+            
assertEquals(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS, 
plan.getGroupBy().getScanAttribName());
             assertTrue(rs.next());
             assertEquals("a", rs.getString(1));
             assertEquals(5, rs.getInt(2));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6f16a6a6/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
index ecb238a..4f1ba5b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
@@ -27,7 +27,6 @@ import org.apache.http.annotation.Immutable;
 import org.apache.phoenix.compile.TrackOrderPreservingExpressionCompiler.Entry;
 import 
org.apache.phoenix.compile.TrackOrderPreservingExpressionCompiler.Ordering;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.TupleProjector;
@@ -38,12 +37,12 @@ import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
-import org.apache.phoenix.schema.types.PDecimal;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDecimal;
+import org.apache.phoenix.schema.types.PVarchar;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-import org.apache.phoenix.schema.types.PVarchar;
 
 /**
  * 
@@ -258,8 +257,6 @@ public class GroupByCompiler {
             }
         }
 
-        // Set attribute with serialized expressions for coprocessor
-        GroupedAggregateRegionObserver.serializeIntoScan(context.getScan(), 
groupExprAttribName, keyExpressions);
         GroupBy groupBy = new 
GroupBy.GroupByBuilder().setScanAttribName(groupExprAttribName).setExpressions(expressions).setKeyExpressions(keyExpressions).build();
         return groupBy;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6f16a6a6/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 8627bfb..617cc48 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -28,6 +28,7 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver;
 import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.OrderByExpression;
@@ -53,8 +54,8 @@ import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.types.PInteger;
 
 
 
@@ -143,32 +144,36 @@ public class AggregatePlan extends BaseQueryPlan {
     protected ResultIterator newIterator() throws SQLException {
         if (groupBy.isEmpty()) {
             
UngroupedAggregateRegionObserver.serializeIntoScan(context.getScan());
-        } else if (limit != null && orderBy.getOrderByExpressions().isEmpty() 
&& having == null
-                && (  (   statement.isDistinct() && ! statement.isAggregate() )
-                   || ( ! statement.isDistinct() && (   
context.getAggregationManager().isEmpty()
-                                                     || 
BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS.equals(groupBy.getScanAttribName())
 ) ) ) ) {
-            /*
-             * Optimization to early exit from the scan for a GROUP BY or 
DISTINCT with a LIMIT.
-             * We may exit early according to the LIMIT specified if the query 
has:
-             * 1) No ORDER BY clause (or the ORDER BY was optimized out). We 
cannot exit
-             *    early if there's an ORDER BY because the first group may be 
found last
-             *    in the scan.
-             * 2) No HAVING clause, since we execute the HAVING on the client 
side. The LIMIT
-             *    needs to be evaluated *after* the HAVING.
-             * 3) DISTINCT clause with no GROUP BY. We cannot exit early if 
there's a
-             *    GROUP BY, as the GROUP BY is processed on the client-side 
post aggregation
-             *    if a DISTNCT has a GROUP BY. Otherwise, since there are no 
aggregate
-             *    functions in a DISTINCT, we can exit early regardless of if 
the
-             *    groups are in row key order or unordered.
-             * 4) GROUP BY clause with no aggregate functions. This is in the 
same category
-             *    as (3). If we're using aggregate functions, we need to look 
at all the
-             *    rows, as otherwise we'd exit early with incorrect aggregate 
function
-             *    calculations.
-             * 5) GROUP BY clause along the pk axis, as the rows are processed 
in row key
-             *    order, so we can early exit, even when aggregate functions 
are used, as
-             *    the rows in the group are contiguous.
-             */
-            
context.getScan().setAttribute(BaseScannerRegionObserver.GROUP_BY_LIMIT, 
PInteger.INSTANCE.toBytes(limit));
+        } else {
+            // Set attribute with serialized expressions for coprocessor
+            
GroupedAggregateRegionObserver.serializeIntoScan(context.getScan(), 
groupBy.getScanAttribName(), groupBy.getKeyExpressions());
+            if (limit != null && orderBy.getOrderByExpressions().isEmpty() && 
having == null
+                    && (  (   statement.isDistinct() && ! 
statement.isAggregate() )
+                            || ( ! statement.isDistinct() && (   
context.getAggregationManager().isEmpty()
+                                                              || 
BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS.equals(groupBy.getScanAttribName())
 ) ) ) ) {
+                /*
+                 * Optimization to early exit from the scan for a GROUP BY or 
DISTINCT with a LIMIT.
+                 * We may exit early according to the LIMIT specified if the 
query has:
+                 * 1) No ORDER BY clause (or the ORDER BY was optimized out). 
We cannot exit
+                 *    early if there's an ORDER BY because the first group may 
be found last
+                 *    in the scan.
+                 * 2) No HAVING clause, since we execute the HAVING on the 
client side. The LIMIT
+                 *    needs to be evaluated *after* the HAVING.
+                 * 3) DISTINCT clause with no GROUP BY. We cannot exit early 
if there's a
+                 *    GROUP BY, as the GROUP BY is processed on the 
client-side post aggregation
+                 *    if a DISTNCT has a GROUP BY. Otherwise, since there are 
no aggregate
+                 *    functions in a DISTINCT, we can exit early regardless of 
if the
+                 *    groups are in row key order or unordered.
+                 * 4) GROUP BY clause with no aggregate functions. This is in 
the same category
+                 *    as (3). If we're using aggregate functions, we need to 
look at all the
+                 *    rows, as otherwise we'd exit early with incorrect 
aggregate function
+                 *    calculations.
+                 * 5) GROUP BY clause along the pk axis, as the rows are 
processed in row key
+                 *    order, so we can early exit, even when aggregate 
functions are used, as
+                 *    the rows in the group are contiguous.
+                 */
+                
context.getScan().setAttribute(BaseScannerRegionObserver.GROUP_BY_LIMIT, 
PInteger.INSTANCE.toBytes(limit));
+            }
         }
         ParallelIterators parallelIterators = new ParallelIterators(this, 
null, wrapParallelIteratorFactory());
         splits = parallelIterators.getSplits();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6f16a6a6/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
index 8a2f6d1..d090203 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
@@ -21,6 +21,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.execute.TupleProjector;
@@ -38,29 +39,22 @@ public class ProjectedColumnExpression extends 
ColumnExpression {
        ValueBitSet bitSet;
        private int position;
        private String displayName;
+       private final Collection<PColumn> columns;
        
        public ProjectedColumnExpression() {
+        this.columns = Collections.emptyList();
        }
 
        public ProjectedColumnExpression(PColumn column, PTable table, String 
displayName) {
-               super(column);
-               this.schema = buildSchema(table);
-               this.bitSet = ValueBitSet.newInstance(schema);
-               this.position = column.getPosition() - 
table.getPKColumns().size();
-               this.displayName = displayName;
+               this(column, table.getColumns(), column.getPosition() - 
table.getPKColumns().size(), displayName);
        }
     
     public ProjectedColumnExpression(PColumn column, Collection<PColumn> 
columns, int position, String displayName) {
         super(column);
-        this.schema = buildSchema(columns);
-        this.bitSet = ValueBitSet.newInstance(schema);
+        this.columns = columns;
         this.position = position;
         this.displayName = displayName;
     }
-
-       private static KeyValueSchema buildSchema(PTable table) {
-        return buildSchema(table.getColumns());
-    }
     
     public static KeyValueSchema buildSchema(Collection<PColumn> columns) {
         KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
@@ -73,6 +67,10 @@ public class ProjectedColumnExpression extends 
ColumnExpression {
     }
     
     public KeyValueSchema getSchema() {
+        if (this.schema == null) {
+            this.schema = buildSchema(columns);
+            this.bitSet = ValueBitSet.newInstance(schema);            
+        }
        return schema;
     }
     
@@ -80,11 +78,15 @@ public class ProjectedColumnExpression extends 
ColumnExpression {
        return position;
     }
     
-   @Override
+    @Override
+    public String toString() {
+        return displayName;
+    }
+       
+       @Override
     public int hashCode() {
         final int prime = 31;
-        int result = 1;
-        result = prime * result + schema.hashCode();
+        int result = super.hashCode();
         result = prime * result + position;
         return result;
     }
@@ -92,22 +94,17 @@ public class ProjectedColumnExpression extends 
ColumnExpression {
     @Override
     public boolean equals(Object obj) {
         if (this == obj) return true;
-        if (obj == null) return false;
+        if (!super.equals(obj)) return false;
         if (getClass() != obj.getClass()) return false;
         ProjectedColumnExpression other = (ProjectedColumnExpression)obj;
-        if (!schema.equals(other.schema)) return false;
         if (position != other.position) return false;
         return true;
     }
 
     @Override
-    public String toString() {
-        return displayName;
-    }
-       
-       @Override
        public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
         try {
+            KeyValueSchema schema = getSchema();
             TupleProjector.decodeProjectedValue(tuple, ptr);
             int maxOffset = ptr.getOffset() + ptr.getLength();
             bitSet.clear();
@@ -136,7 +133,7 @@ public class ProjectedColumnExpression extends 
ColumnExpression {
     @Override
     public void write(DataOutput output) throws IOException {
         super.write(output);
-        schema.write(output);
+        getSchema().write(output);
         output.writeInt(position);
         output.writeUTF(displayName);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6f16a6a6/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 3b93954..a2779a2 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -428,6 +428,11 @@ public class QueryCompilerTest extends 
BaseConnectionlessQueryTest {
     }
 
     private Scan compileQuery(String query, List<Object> binds) throws 
SQLException {
+        QueryPlan plan = getQueryPlan(query, binds);
+        return plan.getContext().getScan();
+    }
+    
+    private QueryPlan getQueryPlan(String query, List<Object> binds) throws 
SQLException {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);
         try {
@@ -435,8 +440,7 @@ public class QueryCompilerTest extends 
BaseConnectionlessQueryTest {
             for (Object bind : binds) {
                 statement.setObject(1, bind);
             }
-            QueryPlan plan = statement.compileQuery(query);
-            return plan.getContext().getScan();
+            return statement.compileQuery(query);
         } finally {
             conn.close();
         }
@@ -455,9 +459,8 @@ public class QueryCompilerTest extends 
BaseConnectionlessQueryTest {
         };
         List<Object> binds = Collections.emptyList();
         for (String query : queries) {
-            Scan scan = compileQuery(query, binds);
-            assertTrue(query, 
scan.getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS) 
!= null);
-            assertTrue(query, 
scan.getAttribute(BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS) == 
null);
+            QueryPlan plan = getQueryPlan(query, binds);
+            assertEquals(plan.getGroupBy().getScanAttribName(), 
BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS);
         }
     }
 
@@ -638,9 +641,8 @@ public class QueryCompilerTest extends 
BaseConnectionlessQueryTest {
         };
         List<Object> binds = Collections.emptyList();
         for (String query : queries) {
-            Scan scan = compileQuery(query, binds);
-            assertTrue(query, 
scan.getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS) 
== null);
-            assertTrue(query, 
scan.getAttribute(BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS) != 
null);
+            QueryPlan plan = getQueryPlan(query, binds);
+            assertEquals(plan.getGroupBy().getScanAttribName(), 
BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS);
         }
     }
     

Reply via email to