Repository: phoenix
Updated Branches:
  refs/heads/3.0 2d250fbff -> 9dd3bc74b


http://git-wip-us.apache.org/repos/asf/phoenix/blob/9dd3bc74/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 19a2789..66ad235 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -19,8 +19,9 @@ package org.apache.phoenix.execute;
 
 import java.sql.ParameterMetaData;
 import java.sql.SQLException;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
+import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
@@ -41,12 +43,14 @@ import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.WhereCompiler;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.expression.AndExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
 import org.apache.phoenix.expression.LiteralExpression;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.expression.RowValueConstructorExpression;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.join.HashCacheClient;
@@ -61,9 +65,11 @@ import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PArrayDataType;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 
@@ -75,29 +81,42 @@ public class HashJoinPlan implements QueryPlan {
     private final FilterableStatement statement;
     private final BasicQueryPlan plan;
     private final HashJoinInfo joinInfo;
-    private final List<Expression>[] hashExpressions;
-    private final Expression[] keyRangeLhsExpressions;
-    private final Expression[] keyRangeRhsExpressions;
-    private final QueryPlan[] hashPlans;
-    private final TupleProjector[] clientProjectors;
-    private final boolean[] hasFilters;
+    private final SubPlan[] subPlans;
+    private final boolean recompileWhereClause;
     private final boolean forceHashJoinRangeScan;
     private final boolean forceHashJoinSkipScan;
-
-    public HashJoinPlan(FilterableStatement statement, 
-            BasicQueryPlan plan, HashJoinInfo joinInfo,
-            List<Expression>[] hashExpressions, Expression[] 
keyRangeLhsExpressions,
-            Expression[] keyRangeRhsExpressions, QueryPlan[] hashPlans, 
-            TupleProjector[] clientProjectors, boolean[] hasFilters) {
+    private List<SQLCloseable> dependencies;
+    private HashCacheClient hashClient;
+    private int maxServerCacheTimeToLive;
+    private AtomicLong firstJobEndTime;
+    private List<Expression> keyRangeExpressions;
+    
+    public static HashJoinPlan create(FilterableStatement statement, 
+            QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) {
+        if (plan instanceof BasicQueryPlan)
+            return new HashJoinPlan(statement, (BasicQueryPlan) plan, 
joinInfo, subPlans, joinInfo == null);
+        
+        assert (plan instanceof HashJoinPlan);
+        HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
+        assert hashJoinPlan.joinInfo == null;
+        SubPlan[] mergedSubPlans = new SubPlan[hashJoinPlan.subPlans.length + 
subPlans.length];
+        int i = 0;
+        for (SubPlan subPlan : hashJoinPlan.subPlans) {
+            mergedSubPlans[i++] = subPlan;
+        }
+        for (SubPlan subPlan : subPlans) {
+            mergedSubPlans[i++] = subPlan;
+        }
+        return new HashJoinPlan(statement, hashJoinPlan.plan, joinInfo, 
mergedSubPlans, true);
+    }
+    
+    private HashJoinPlan(FilterableStatement statement, 
+            BasicQueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, 
boolean recompileWhereClause) {
         this.statement = statement;
         this.plan = plan;
         this.joinInfo = joinInfo;
-        this.hashExpressions = hashExpressions;
-        this.keyRangeLhsExpressions = keyRangeLhsExpressions;
-        this.keyRangeRhsExpressions = keyRangeRhsExpressions;
-        this.hashPlans = hashPlans;
-        this.clientProjectors = clientProjectors;
-        this.hasFilters = hasFilters;
+        this.subPlans = subPlans;
+        this.recompileWhereClause = recompileWhereClause;
         this.forceHashJoinRangeScan = 
plan.getStatement().getHint().hasHint(Hint.RANGE_SCAN_HASH_JOIN);
         this.forceHashJoinSkipScan = 
plan.getStatement().getHint().hasHint(Hint.SKIP_SCAN_HASH_JOIN);
     }
@@ -119,42 +138,25 @@ public class HashJoinPlan implements QueryPlan {
 
     @Override
     public ResultIterator iterator() throws SQLException {
-        ImmutableBytesPtr[] joinIds = joinInfo.getJoinIds();
-        assert (joinIds.length == hashExpressions.length && joinIds.length == 
hashPlans.length);
-
-        final HashCacheClient hashClient = new 
HashCacheClient(plan.getContext().getConnection());
-        Scan scan = plan.getContext().getScan();
-        final ScanRanges ranges = plan.getContext().getScanRanges();
-
-        int count = joinIds.length;
+        int count = subPlans.length;
         ConnectionQueryServices services = 
getContext().getConnection().getQueryServices();
         ExecutorService executor = services.getExecutor();
-        List<Future<ServerCache>> futures = new 
ArrayList<Future<ServerCache>>(count);
-        List<SQLCloseable> dependencies = new ArrayList<SQLCloseable>(count);
-        List<Expression> keyRangeExpressions = new ArrayList<Expression>();
-        @SuppressWarnings("unchecked")
-        final List<ImmutableBytesWritable>[] keyRangeRhsValues = new 
List[count];  
-        final int maxServerCacheTimeToLive = 
services.getProps().getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
 QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
-        final AtomicLong firstJobEndTime = new AtomicLong(0);
-        SQLException firstException = null;
+        List<Future<Object>> futures = 
Lists.<Future<Object>>newArrayListWithExpectedSize(count);
+        dependencies = Lists.newArrayList();
+        if (joinInfo != null) {
+            hashClient = new 
HashCacheClient(plan.getContext().getConnection());
+            maxServerCacheTimeToLive = 
services.getProps().getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
 QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
+            firstJobEndTime = new AtomicLong(0);
+            keyRangeExpressions = new CopyOnWriteArrayList<Expression>();
+        }
+        
         for (int i = 0; i < count; i++) {
             final int index = i;
-            if (keyRangeRhsExpressions[index] != null) {
-                keyRangeRhsValues[index] = new 
ArrayList<ImmutableBytesWritable>();
-            }
-            futures.add(executor.submit(new JobCallable<ServerCache>() {
+            futures.add(executor.submit(new JobCallable<Object>() {
 
                 @Override
-                public ServerCache call() throws Exception {
-                    QueryPlan hashPlan = hashPlans[index];
-                    ServerCache cache = hashClient.addHashCache(ranges, 
hashPlan.iterator(), 
-                            clientProjectors[index], 
hashPlan.getEstimatedSize(), hashExpressions[index], plan.getTableRef(), 
keyRangeRhsExpressions[index], keyRangeRhsValues[index]);
-                    long endTime = System.currentTimeMillis();
-                    boolean isSet = firstJobEndTime.compareAndSet(0, endTime);
-                    if (!isSet && (endTime - firstJobEndTime.get()) > 
maxServerCacheTimeToLive) {
-                        LOG.warn("Hash plan [" + index + "] execution seems 
too slow. Earlier hash cache(s) might have expired on servers.");
-                    }
-                    return cache;
+                public Object call() throws Exception {
+                    return subPlans[index].execute(HashJoinPlan.this);
                 }
 
                 @Override
@@ -163,21 +165,19 @@ public class HashJoinPlan implements QueryPlan {
                 }
             }));
         }
+        
+        SQLException firstException = null;
         for (int i = 0; i < count; i++) {
             try {
-                ServerCache cache = futures.get(i).get();
-                joinIds[i].set(cache.getId());
-                dependencies.add(cache);
-                if (keyRangeRhsExpressions[i] != null) {
-                    
keyRangeExpressions.add(createKeyRangeExpression(keyRangeLhsExpressions[i], 
keyRangeRhsExpressions[i], keyRangeRhsValues[i], 
plan.getContext().getTempPtr(), hasFilters[i]));
-                }
+                Object result = futures.get(i).get();
+                subPlans[i].postProcess(result, this);
             } catch (InterruptedException e) {
                 if (firstException == null) {
-                    firstException = new SQLException("Hash plan [" + i + "] 
execution interrupted.", e);
+                    firstException = new SQLException("Sub plan [" + i + "] 
execution interrupted.", e);
                 }
             } catch (ExecutionException e) {
                 if (firstException == null) {
-                    firstException = new SQLException("Encountered exception 
in hash plan [" + i + "] execution.", 
+                    firstException = new SQLException("Encountered exception 
in sub plan [" + i + "] execution.", 
                             e.getCause());
                 }
             }
@@ -186,16 +186,26 @@ public class HashJoinPlan implements QueryPlan {
             SQLCloseables.closeAllQuietly(dependencies);
             throw firstException;
         }
-
-        HashJoinInfo.serializeHashJoinIntoScan(scan, joinInfo);
-        if (!keyRangeExpressions.isEmpty()) {
+        
+        boolean hasKeyRangeExpressions = keyRangeExpressions != null && 
!keyRangeExpressions.isEmpty();
+        if (recompileWhereClause || hasKeyRangeExpressions) {
             StatementContext context = plan.getContext();
             PTable table = context.getCurrentTable().getTable();
             ParseNode viewWhere = table.getViewStatement() == null ? null : 
new SQLParser(table.getViewStatement()).parseQuery().getWhere();
             
context.setResolver(FromCompiler.getResolverForQuery((SelectStatement) 
(plan.getStatement()), plan.getContext().getConnection()));
-            WhereCompiler.compile(plan.getContext(), plan.getStatement(), 
viewWhere, keyRangeExpressions, true);
+            if (recompileWhereClause) {
+                WhereCompiler.compile(plan.getContext(), plan.getStatement(), 
viewWhere);                
+            }
+            if (hasKeyRangeExpressions) {
+                WhereCompiler.compile(plan.getContext(), plan.getStatement(), 
viewWhere, keyRangeExpressions, true);
+            }
         }
 
+        if (joinInfo != null) {
+            Scan scan = plan.getContext().getScan();
+            HashJoinInfo.serializeHashJoinIntoScan(scan, joinInfo);
+        }
+        
         return plan.iterator(dependencies);
     }
 
@@ -250,40 +260,20 @@ public class HashJoinPlan implements QueryPlan {
 
     @Override
     public ExplainPlan getExplainPlan() throws SQLException {
-        List<String> mainQuerySteps = plan.getExplainPlan().getPlanSteps();
-        List<String> planSteps = Lists.newArrayList(mainQuerySteps);
-        int count = hashPlans.length;
-        planSteps.add("    PARALLEL EQUI-JOIN " + count + " HASH TABLES:");
+        List<String> planSteps = 
Lists.newArrayList(plan.getExplainPlan().getPlanSteps());
+        int count = subPlans.length;
+        planSteps.add("    PARALLEL EQUI/SEMI/ANTI-JOIN " + count + " 
TABLES:");
         for (int i = 0; i < count; i++) {
-            boolean earlyEvaluation = joinInfo.earlyEvaluation()[i];
-            boolean skipMerge = joinInfo.getSchemas()[i].getFieldCount() == 0;
-            planSteps.add("    BUILD HASH TABLE " + i + (earlyEvaluation ? "" 
: "(DELAYED EVALUATION)") + (skipMerge ? " (SKIP MERGE)" : ""));
-            List<String> steps = hashPlans[i].getExplainPlan().getPlanSteps();
-            for (String step : steps) {
-                planSteps.add("        " + step);
-            }
+            planSteps.addAll(subPlans[i].getPreSteps(this));
         }
-        String dynamicFilters = null;
-        int filterCount = 0;
         for (int i = 0; i < count; i++) {
-            if (keyRangeLhsExpressions[i] != null) {
-                if (filterCount == 1) {
-                    dynamicFilters = "(" + dynamicFilters + ")";
-                }
-                String filter = keyRangeLhsExpressions[i].toString() 
-                        + (useInClause(hasFilters[i]) ? " IN " : " BETWEEN 
MIN/MAX OF ") 
-                        + "(" + keyRangeRhsExpressions[i].toString() + ")";
-                dynamicFilters = dynamicFilters == null ? filter : 
(dynamicFilters + " AND (" + filter + ")");
-                filterCount++;
-            }
-        }
-        if (dynamicFilters != null) {
-            planSteps.add("    DYNAMIC SERVER FILTER BY " + dynamicFilters);
+            planSteps.addAll(subPlans[i].getPostSteps(this));
         }
-        if (joinInfo.getPostJoinFilterExpression() != null) {
+        
+        if (joinInfo != null && joinInfo.getPostJoinFilterExpression() != 
null) {
             planSteps.add("    AFTER-JOIN SERVER FILTER BY " + 
joinInfo.getPostJoinFilterExpression().toString());
         }
-        if (joinInfo.getLimit() != null) {
+        if (joinInfo != null && joinInfo.getLimit() != null) {
             planSteps.add("    JOIN-SCANNER " + joinInfo.getLimit() + " ROW 
LIMIT");
         }
 
@@ -320,5 +310,158 @@ public class HashJoinPlan implements QueryPlan {
         return false;
     }
 
+    protected interface SubPlan {
+        public Object execute(HashJoinPlan parent) throws SQLException;
+        public void postProcess(Object result, HashJoinPlan parent) throws 
SQLException;
+        public List<String> getPreSteps(HashJoinPlan parent) throws 
SQLException;
+        public List<String> getPostSteps(HashJoinPlan parent) throws 
SQLException;
+    }
+    
+    public static class WhereClauseSubPlan implements SubPlan {
+        private final QueryPlan plan;
+        private final SelectStatement select;
+        private final boolean expectSingleRow;
+        
+        public WhereClauseSubPlan(QueryPlan plan, SelectStatement select, 
boolean expectSingleRow) {
+            this.plan = plan;
+            this.select = select;
+            this.expectSingleRow = expectSingleRow;
+        }
+
+        @Override
+        public Object execute(HashJoinPlan parent) throws SQLException {
+            List<Object> values = Lists.<Object> newArrayList();
+            ResultIterator iterator = plan.iterator();
+            RowProjector projector = plan.getProjector();
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            int columnCount = projector.getColumnCount();
+            int rowCount = 0;
+            PDataType baseType = null;
+            for (Tuple tuple = iterator.next(); tuple != null; tuple = 
iterator.next()) {
+                if (expectSingleRow && rowCount >= 1)
+                    throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS).build().buildException();
+                
+                if (columnCount == 1) {
+                    ColumnProjector columnProjector = 
projector.getColumnProjector(0);
+                    baseType = columnProjector.getExpression().getDataType();
+                    Object value = columnProjector.getValue(tuple, baseType, 
ptr);
+                    values.add(value);
+                } else {
+                    List<Expression> expressions = 
Lists.<Expression>newArrayListWithExpectedSize(columnCount);
+                    for (int i = 0; i < columnCount; i++) {
+                        ColumnProjector columnProjector = 
projector.getColumnProjector(i);
+                        PDataType type = 
columnProjector.getExpression().getDataType();
+                        Object value = columnProjector.getValue(tuple, type, 
ptr);
+                        expressions.add(LiteralExpression.newConstant(value, 
type));
+                    }
+                    Expression expression = new 
RowValueConstructorExpression(expressions, true);
+                    baseType = expression.getDataType();
+                    expression.evaluate(null, ptr);
+                    values.add(baseType.toObject(ptr));
+                }
+                rowCount++;
+            }
+            
+            Object result = expectSingleRow ? (values.isEmpty() ? null : 
values.get(0)) : PArrayDataType.instantiatePhoenixArray(baseType, 
values.toArray());
+            parent.getContext().setSubqueryResult(select, result);
+            return null;
+        }
+
+        @Override
+        public void postProcess(Object result, HashJoinPlan parent) throws 
SQLException {
+        }
+
+        @Override
+        public List<String> getPreSteps(HashJoinPlan parent) throws 
SQLException {
+            List<String> steps = Lists.newArrayList();
+            steps.add("    EXECUTE " + (expectSingleRow ? "SINGLE" : 
"MULTIPLE") + "-ROW SUBQUERY");
+            for (String step : plan.getExplainPlan().getPlanSteps()) {
+                steps.add("        " + step);
+            }
+            return steps;
+        }
+
+        @Override
+        public List<String> getPostSteps(HashJoinPlan parent) throws 
SQLException {
+            return Collections.<String>emptyList();
+        }
+    }
+    
+    public static class HashSubPlan implements SubPlan {        
+        private final int index;
+        private final QueryPlan plan;
+        private final List<Expression> hashExpressions;
+        private final Expression keyRangeLhsExpression;
+        private final Expression keyRangeRhsExpression;
+        private final TupleProjector clientProjector;
+        private final boolean hasFilters;
+        
+        public HashSubPlan(int index, QueryPlan subPlan, 
+                List<Expression> hashExpressions,
+                Expression keyRangeLhsExpression, 
+                Expression keyRangeRhsExpression, 
+                TupleProjector clientProjector, boolean hasFilters) {
+            this.index = index;
+            this.plan = subPlan;
+            this.hashExpressions = hashExpressions;
+            this.keyRangeLhsExpression = keyRangeLhsExpression;
+            this.keyRangeRhsExpression = keyRangeRhsExpression;
+            this.clientProjector = clientProjector;
+            this.hasFilters = hasFilters;
+        }
+
+        @Override
+        public Object execute(HashJoinPlan parent) throws SQLException {
+            ScanRanges ranges = parent.plan.getContext().getScanRanges();
+            List<ImmutableBytesWritable> keyRangeRhsValues = null;
+            if (keyRangeRhsExpression != null) {
+                keyRangeRhsValues = 
Lists.<ImmutableBytesWritable>newArrayList();
+            }
+            ServerCache cache = parent.hashClient.addHashCache(ranges, 
plan.iterator(), 
+                    clientProjector, plan.getEstimatedSize(), hashExpressions, 
parent.plan.getTableRef(), keyRangeRhsExpression, keyRangeRhsValues);
+            long endTime = System.currentTimeMillis();
+            boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime);
+            if (!isSet && (endTime - parent.firstJobEndTime.get()) > 
parent.maxServerCacheTimeToLive) {
+                LOG.warn("Hash plan [" + index + "] execution seems too slow. 
Earlier hash cache(s) might have expired on servers.");
+            }
+            if (keyRangeRhsValues != null) {
+                
parent.keyRangeExpressions.add(parent.createKeyRangeExpression(keyRangeLhsExpression,
 keyRangeRhsExpression, keyRangeRhsValues, plan.getContext().getTempPtr(), 
hasFilters));
+            }
+            return cache;
+        }
+
+        @Override
+        public void postProcess(Object result, HashJoinPlan parent)
+                throws SQLException {
+            ServerCache cache = (ServerCache) result;
+            parent.joinInfo.getJoinIds()[index].set(cache.getId());
+            parent.dependencies.add(cache);
+        }
+
+        @Override
+        public List<String> getPreSteps(HashJoinPlan parent) throws 
SQLException {
+            List<String> steps = Lists.newArrayList();
+            boolean earlyEvaluation = parent.joinInfo.earlyEvaluation()[index];
+            boolean skipMerge = 
parent.joinInfo.getSchemas()[index].getFieldCount() == 0;
+            steps.add("    BUILD HASH TABLE " + index + (earlyEvaluation ? "" 
: "(DELAYED EVALUATION)") + (skipMerge ? " (SKIP MERGE)" : ""));
+            for (String step : plan.getExplainPlan().getPlanSteps()) {
+                steps.add("        " + step);
+            }
+            return steps;
+        }
+
+        @Override
+        public List<String> getPostSteps(HashJoinPlan parent) throws 
SQLException {
+            if (keyRangeLhsExpression == null)
+                return Collections.<String> emptyList();
+            
+            String step = "    DYNAMIC SERVER FILTER BY " + 
keyRangeLhsExpression.toString() 
+                    + (parent.useInClause(hasFilters) ? " IN " : " BETWEEN 
MIN/MAX OF ") 
+                    + "(" + keyRangeRhsExpression.toString() + ")";
+            return Collections.<String> singletonList(step);
+        }
+        
+    }
 }
 
+

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9dd3bc74/phoenix-core/src/main/java/org/apache/phoenix/expression/ComparisonExpression.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/ComparisonExpression.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/ComparisonExpression.java
index 008ae7b..8e352e6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/ComparisonExpression.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/ComparisonExpression.java
@@ -29,6 +29,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.expression.function.InlineArrayElemRefExpression;
 import org.apache.phoenix.expression.visitor.ExpressionVisitor;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.SortOrder;
@@ -112,7 +113,7 @@ public class ComparisonExpression extends 
BaseCompoundExpression {
         PDataType lhsExprDataType = lhsExpr.getDataType();
         PDataType rhsExprDataType = rhsExpr.getDataType();
         
-        if (lhsExpr instanceof RowValueConstructorExpression || rhsExpr 
instanceof RowValueConstructorExpression) {
+        if ((lhsExpr instanceof RowValueConstructorExpression || rhsExpr 
instanceof RowValueConstructorExpression) && !(lhsExpr instanceof 
InlineArrayElemRefExpression) && !(rhsExpr instanceof 
InlineArrayElemRefExpression)) {
             if (op == CompareOp.EQUAL || op == CompareOp.NOT_EQUAL) {
                 List<Expression> andNodes = 
Lists.<Expression>newArrayListWithExpectedSize(Math.max(lhsExpr.getChildren().size(),
 rhsExpr.getChildren().size()));
                 rewriteRVCAsEqualityExpression(lhsExpr, rhsExpr, andNodes, 
ptr);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9dd3bc74/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index df715ad..b3be0d5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -499,6 +499,14 @@ public class ParseNodeFactory {
             throw new IllegalArgumentException("Unexpcted CompareOp of " + op);
         }
     }
+    
+    public ArrayAnyComparisonNode arrayAny(ParseNode rhs, ComparisonParseNode 
compareNode) {
+        return new ArrayAnyComparisonNode(rhs, compareNode);
+    }
+    
+    public ArrayAllComparisonNode arrayAll(ParseNode rhs, ComparisonParseNode 
compareNode) {
+        return new ArrayAllComparisonNode(rhs, compareNode);
+    }
 
     public ArrayAnyComparisonNode wrapInAny(CompareOp op, ParseNode lhs, 
ParseNode rhs) {
         return new ArrayAnyComparisonNode(rhs, comparison(op, lhs, 
elementRef(Arrays.<ParseNode>asList(rhs, literal(1)))));
@@ -597,8 +605,14 @@ public class ParseNodeFactory {
                 statement.getBindCount(), statement.isAggregate(), 
statement.hasSequence());
     }
 
-    public SubqueryParseNode subquery(SelectStatement select) {
-        return new SubqueryParseNode(select);
+    public SelectStatement select(SelectStatement statement, LimitNode limit) {
+        return select(statement.getFrom(), statement.getHint(), 
statement.isDistinct(), statement.getSelect(),
+                statement.getWhere(), statement.getGroupBy(), 
statement.getHaving(), statement.getOrderBy(), limit,
+                statement.getBindCount(), statement.isAggregate(), 
statement.hasSequence());
+    }
+
+    public SubqueryParseNode subquery(SelectStatement select, boolean 
expectSingleRow) {
+        return new SubqueryParseNode(select, expectSingleRow);
     }
 
     public LimitNode limit(BindParseNode b) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9dd3bc74/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
index 479fddd..6494752 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
@@ -348,6 +348,17 @@ public class ParseNodeRewriter extends 
TraverseAllParseNodeVisitor<ParseNode> {
     }
 
     @Override
+    public ParseNode visitLeave(final InParseNode node, List<ParseNode> nodes) 
throws SQLException {
+        ParseNode normNode = leaveCompoundNode(node, nodes, new 
CompoundNodeFactory() {
+            @Override
+            public ParseNode createNode(List<ParseNode> children) {
+                return NODE_FACTORY.in(children.get(0), children.get(1), 
node.isNegate());
+            }
+        });
+        return normNode;
+    }
+    
+    @Override
     public ParseNode visitLeave(final IsNullParseNode node, List<ParseNode> 
nodes) throws SQLException {
         return leaveCompoundNode(node, nodes, new CompoundNodeFactory() {
             @Override
@@ -429,6 +440,11 @@ public class ParseNodeRewriter extends 
TraverseAllParseNodeVisitor<ParseNode> {
     }
 
     @Override
+    public ParseNode visit(SubqueryParseNode node) throws SQLException {
+        return node;
+    }
+    
+    @Override
     public List<ParseNode> newElementList(int size) {
         nodeCount += size;
         return new ArrayList<ParseNode>(size);
@@ -541,13 +557,25 @@ public class ParseNodeRewriter extends 
TraverseAllParseNodeVisitor<ParseNode> {
        }
 
        @Override
-    public ParseNode visitLeave(ArrayAnyComparisonNode node, List<ParseNode> 
l) throws SQLException {
-        return node;
+    public ParseNode visitLeave(ArrayAnyComparisonNode node, final 
List<ParseNode> nodes) throws SQLException {
+        ParseNode normNode = leaveCompoundNode(node, nodes, new 
CompoundNodeFactory() {
+            @Override
+            public ParseNode createNode(List<ParseNode> children) {
+                return NODE_FACTORY.arrayAny(nodes.get(0), 
(ComparisonParseNode) nodes.get(1));
+            }
+        });
+        return normNode;
     }
 
     @Override
-    public ParseNode visitLeave(ArrayAllComparisonNode node, List<ParseNode> 
l) throws SQLException {
-        return node;
+    public ParseNode visitLeave(ArrayAllComparisonNode node, final 
List<ParseNode> nodes) throws SQLException {
+        ParseNode normNode = leaveCompoundNode(node, nodes, new 
CompoundNodeFactory() {
+            @Override
+            public ParseNode createNode(List<ParseNode> children) {
+                return NODE_FACTORY.arrayAll(nodes.get(0), 
(ComparisonParseNode) nodes.get(1));
+            }
+        });
+        return normNode;
     }
  
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9dd3bc74/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeVisitor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeVisitor.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeVisitor.java
index 40354fe..01925ff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeVisitor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeVisitor.java
@@ -81,6 +81,9 @@ public interface ParseNodeVisitor<E> {
     public boolean visitEnter(InListParseNode node) throws SQLException;
     public E visitLeave(InListParseNode node, List<E> l) throws SQLException;
     
+    public boolean visitEnter(InParseNode node) throws SQLException;
+    public E visitLeave(InParseNode node, List<E> l) throws SQLException;
+    
     public boolean visitEnter(IsNullParseNode node) throws SQLException;
     public E visitLeave(IsNullParseNode node, List<E> l) throws SQLException;
     
@@ -89,7 +92,8 @@ public interface ParseNodeVisitor<E> {
     public E visit(BindParseNode node) throws SQLException;
     public E visit(WildcardParseNode node) throws SQLException;
     public E visit(TableWildcardParseNode node) throws SQLException;
-    public E visit(FamilyWildcardParseNode node) throws SQLException;  
+    public E visit(FamilyWildcardParseNode node) throws SQLException;
+    public E visit(SubqueryParseNode node) throws SQLException;
     public E visit(ParseNode node) throws SQLException;  
     
     public boolean visitEnter(StringConcatParseNode node) throws SQLException;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9dd3bc74/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatementRewriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatementRewriter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatementRewriter.java
index 194f3bc..ab06d46 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatementRewriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatementRewriter.java
@@ -183,4 +183,17 @@ public class SelectStatementRewriter extends 
ParseNodeRewriter {
     public ParseNode visitLeave(InListParseNode node, List<ParseNode> c) 
throws SQLException {
         return c.isEmpty() ? null : node;
     }
+    
+    @Override
+    public boolean visitEnter(InParseNode node) throws SQLException {
+        if (removeNodes.contains(node)) {
+            return false;
+        }
+        return true;
+    }
+    
+    @Override
+    public ParseNode visitLeave(InParseNode node, List<ParseNode> c) throws 
SQLException {
+        return c.isEmpty() ? null : node;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9dd3bc74/phoenix-core/src/main/java/org/apache/phoenix/parse/StatelessTraverseAllParseNodeVisitor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/StatelessTraverseAllParseNodeVisitor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/StatelessTraverseAllParseNodeVisitor.java
index 0be5e01..5e9f727 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/StatelessTraverseAllParseNodeVisitor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/StatelessTraverseAllParseNodeVisitor.java
@@ -91,6 +91,11 @@ public class StatelessTraverseAllParseNodeVisitor extends 
TraverseAllParseNodeVi
     public Void visitLeave(InListParseNode node, List<Void> l) throws 
SQLException {
         return null;
     }
+    
+    @Override
+    public Void visitLeave(InParseNode node, List<Void> l) throws SQLException 
{
+        return null;
+    }
 
     @Override
     public Void visitLeave(StringConcatParseNode node, List<Void> l) throws 
SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9dd3bc74/phoenix-core/src/main/java/org/apache/phoenix/parse/SubqueryParseNode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/SubqueryParseNode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/SubqueryParseNode.java
index a3bad0d..92c5284 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SubqueryParseNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SubqueryParseNode.java
@@ -30,14 +30,20 @@ import java.sql.SQLException;
  */
 public class SubqueryParseNode extends TerminalParseNode {
     private final SelectStatement select;
+    private final boolean expectSingleRow;
 
-    SubqueryParseNode(SelectStatement select) {
+    SubqueryParseNode(SelectStatement select, boolean expectSingleRow) {
         this.select = select;
+        this.expectSingleRow = expectSingleRow;
     }
     
     public SelectStatement getSelectNode() {
         return select;
     }
+    
+    public boolean expectSingleRow() {
+        return expectSingleRow;
+    }
 
     @Override
     public <T> T accept(ParseNodeVisitor<T> visitor) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9dd3bc74/phoenix-core/src/main/java/org/apache/phoenix/parse/TraverseAllParseNodeVisitor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/TraverseAllParseNodeVisitor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/TraverseAllParseNodeVisitor.java
index af20278..d6b444f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/TraverseAllParseNodeVisitor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/TraverseAllParseNodeVisitor.java
@@ -75,6 +75,11 @@ public abstract class TraverseAllParseNodeVisitor<T> extends 
BaseParseNodeVisito
     }
     
     @Override
+    public boolean visitEnter(InParseNode node) throws SQLException {
+        return true;
+    }
+    
+    @Override
     public boolean visitEnter(IsNullParseNode node) throws SQLException {
         return true;
     }
@@ -143,6 +148,11 @@ public abstract class TraverseAllParseNodeVisitor<T> 
extends BaseParseNodeVisito
     public T visit(FamilyWildcardParseNode node) throws SQLException {
         return null;
     }
+
+    @Override
+    public T visit(SubqueryParseNode node) throws SQLException {
+        return null;
+    }
     
     @Override
     public boolean visitEnter(StringConcatParseNode node) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9dd3bc74/phoenix-core/src/main/java/org/apache/phoenix/parse/TraverseNoParseNodeVisitor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/TraverseNoParseNodeVisitor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/TraverseNoParseNodeVisitor.java
index 4c0fbea..37be462 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/TraverseNoParseNodeVisitor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/TraverseNoParseNodeVisitor.java
@@ -100,6 +100,16 @@ public abstract class TraverseNoParseNodeVisitor<T> 
extends BaseParseNodeVisitor
     }
     
     @Override
+    public boolean visitEnter(InParseNode node) throws SQLException {
+        return false;
+    }
+    
+    @Override
+    public T visitLeave(InParseNode node, List<T> l) throws SQLException {
+        return null;
+    }
+    
+    @Override
     public boolean visitEnter(IsNullParseNode node) throws SQLException {
         return false;
     }
@@ -138,6 +148,11 @@ public abstract class TraverseNoParseNodeVisitor<T> 
extends BaseParseNodeVisitor
     public T visit(FamilyWildcardParseNode node) throws SQLException {
         return null;
     }
+    
+    @Override
+    public T visit(SubqueryParseNode node) throws SQLException {
+        return null;
+    }
 
     @Override
     public T visitLeave(AndParseNode node, List<T> l) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9dd3bc74/phoenix-core/src/test/java/org/apache/phoenix/compile/JoinQueryCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/JoinQueryCompilerTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/JoinQueryCompilerTest.java
index 95c869a..a08b0e3 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/JoinQueryCompilerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/JoinQueryCompilerTest.java
@@ -59,10 +59,10 @@ public class JoinQueryCompilerTest extends 
BaseConnectionlessQueryTest {
         assertEquals(
                        "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + 
JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
                        "    SERVER FILTER BY FIRST KEY ONLY\n" +
-                       "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                       "    PARALLEL EQUI/SEMI/ANTI-JOIN 1 TABLES:\n" +
                        "    BUILD HASH TABLE 0\n" +
                        "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + 
JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
-                       "            PARALLEL EQUI-JOIN 2 HASH TABLES:\n" +
+                       "            PARALLEL EQUI/SEMI/ANTI-JOIN 2 TABLES:\n" +
                        "            BUILD HASH TABLE 0\n" +
                        "                CLIENT PARALLEL 1-WAY FULL SCAN OVER " 
+ JOIN_CUSTOMER_TABLE_DISPLAY_NAME + "\n" +
                        "                    SERVER FILTER BY NAME LIKE 'C%'\n" 
+

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9dd3bc74/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index b8497c5..59eb70b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -401,33 +401,33 @@ public abstract class BaseTest {
         builder.put("SumDoubleTest","create table SumDoubleTest" +
                 "   (id varchar not null primary key, d DOUBLE, f FLOAT, ud 
UNSIGNED_DOUBLE, uf UNSIGNED_FLOAT, i integer, de decimal)");
         builder.put(JOIN_ORDER_TABLE_FULL_NAME, "create table " + 
JOIN_ORDER_TABLE_FULL_NAME +
-                "   (\"order_id\" char(15) not null primary key, " +
-                "    \"customer_id\" char(10), " +
-                "    \"item_id\" char(10), " +
+                "   (\"order_id\" varchar(15) not null primary key, " +
+                "    \"customer_id\" varchar(10), " +
+                "    \"item_id\" varchar(10), " +
                 "    price integer, " +
                 "    quantity integer, " +
                 "    date timestamp)");
         builder.put(JOIN_CUSTOMER_TABLE_FULL_NAME, "create table " + 
JOIN_CUSTOMER_TABLE_FULL_NAME +
-                "   (\"customer_id\" char(10) not null primary key, " +
+                "   (\"customer_id\" varchar(10) not null primary key, " +
                 "    name varchar, " +
-                "    phone char(12), " +
+                "    phone varchar(12), " +
                 "    address varchar, " +
-                "    loc_id char(5), " +
+                "    loc_id varchar(5), " +
                 "    date date)");
         builder.put(JOIN_ITEM_TABLE_FULL_NAME, "create table " + 
JOIN_ITEM_TABLE_FULL_NAME +
-                "   (\"item_id\" char(10) not null primary key, " +
+                "   (\"item_id\" varchar(10) not null primary key, " +
                 "    name varchar, " +
                 "    price integer, " +
                 "    discount1 integer, " +
                 "    discount2 integer, " +
-                "    \"supplier_id\" char(10), " +
+                "    \"supplier_id\" varchar(10), " +
                 "    description varchar)");
         builder.put(JOIN_SUPPLIER_TABLE_FULL_NAME, "create table " + 
JOIN_SUPPLIER_TABLE_FULL_NAME +
-                "   (\"supplier_id\" char(10) not null primary key, " +
+                "   (\"supplier_id\" varchar(10) not null primary key, " +
                 "    name varchar, " +
-                "    phone char(12), " +
+                "    phone varchar(12), " +
                 "    address varchar, " +
-                "    loc_id char(5))");
+                "    loc_id varchar(5))");
         tableDDLMap = builder.build();
     }
     

Reply via email to