Author: rohini
Date: Wed Sep  2 21:56:58 2015
New Revision: 1700912

URL: http://svn.apache.org/r1700912
Log:
PIG-4315: MergeJoin or Split followed by order by gives NPE in Tez (rohini)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
    pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1700912&r1=1700911&r2=1700912&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Sep  2 21:56:58 2015
@@ -38,6 +38,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4315: MergeJoin or Split followed by order by gives NPE in Tez (rohini)
+
 PIG-4654: Reduce tez memory.reserve-fraction and clear spillables for better 
memory utilization (rohini)
 
 PIG-4628: Pig 0.14 job with order by fails in mapreduce mode with Oozie 
(knoguchi)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java?rev=1700912&r1=1700911&r2=1700912&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
 Wed Sep  2 21:56:58 2015
@@ -193,10 +193,8 @@ public class TezOperPlan extends Operato
     public void moveTree(TezOperator root, TezOperPlan newPlan) throws 
PlanException {
         List<TezOperator> list = new ArrayList<TezOperator>();
         list.add(root);
-        int prevSize = 0;
         int pos = 0;
-        while (list.size() > prevSize) {
-            prevSize = list.size();
+        while (list.size() > pos) {
             TezOperator node = list.get(pos);
             if (getSuccessors(node)!=null) {
                 for (TezOperator succ : getSuccessors(node)) {
@@ -243,11 +241,11 @@ public class TezOperPlan extends Operato
         }
     }
 
-    // This method is used in PigGraceShuffleVertexManager to get a list of 
grandparents. 
+    // This method is used in PigGraceShuffleVertexManager to get a list of 
grandparents.
     // Also need to exclude grandparents which also a parent (a is both parent 
and grandparent in the diagram below)
     //    a   ->    c
     //      \  b  /
-    // 
+    //
     public static List<TezOperator> 
getGrandParentsForGraceParallelism(TezOperPlan tezPlan, TezOperator op) {
         List<TezOperator> grandParents = new ArrayList<TezOperator>();
         List<TezOperator> preds = tezPlan.getPredecessors(op);

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java?rev=1700912&r1=1700911&r2=1700912&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
 Wed Sep  2 21:56:58 2015
@@ -73,7 +73,7 @@ public class POFRJoinTez extends POFRJoi
 
     @Override
     public void replaceInput(String oldInputKey, String newInputKey) {
-        if (inputKeys.remove(oldInputKey)) {
+        while (inputKeys.remove(oldInputKey)) {
             inputKeys.add(newInputKey);
         }
     }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1700912&r1=1700911&r2=1700912&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
 Wed Sep  2 21:56:58 2015
@@ -73,7 +73,7 @@ public class POShuffleTezLoad extends PO
 
     @Override
     public void replaceInput(String oldInputKey, String newInputKey) {
-        if (inputKeys.remove(oldInputKey)) {
+        while (inputKeys.remove(oldInputKey)) {
             inputKeys.add(newInputKey);
         }
     }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java?rev=1700912&r1=1700911&r2=1700912&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
 Wed Sep  2 21:56:58 2015
@@ -71,7 +71,7 @@ public class POShuffledValueInputTez ext
 
     @Override
     public void replaceInput(String oldInputKey, String newInputKey) {
-        if (inputKeys.remove(oldInputKey)) {
+        while (inputKeys.remove(oldInputKey)) {
             inputKeys.add(newInputKey);
         }
     }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java?rev=1700912&r1=1700911&r2=1700912&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
 Wed Sep  2 21:56:58 2015
@@ -96,7 +96,7 @@ public class POValueOutputTez extends Ph
 
     @Override
     public void replaceOutput(String oldOutputKey, String newOutputKey) {
-        if (outputKeys.remove(oldOutputKey)) {
+        while (outputKeys.remove(oldOutputKey)) {
             outputKeys.add(newOutputKey);
         }
     }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1700912&r1=1700911&r2=1700912&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
 Wed Sep  2 21:56:58 2015
@@ -23,20 +23,14 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.commons.lang.ArrayUtils;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
-import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
-import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
@@ -214,22 +208,12 @@ public class MultiQueryOptimizerTez exte
 
         if (plan.getPredecessors(splittee) != null) {
             for (TezOperator pred : new 
ArrayList<TezOperator>(plan.getPredecessors(splittee))) {
-                List<TezOutput> tezOutputs = 
PlanHelper.getPhysicalOperators(pred.plan,
-                        TezOutput.class);
-                for (TezOutput tezOut : tezOutputs) {
-                    if (ArrayUtils.contains(tezOut.getTezOutputs(), 
spliteeKey)) {
-                        tezOut.replaceOutput(spliteeKey, splitterKey);
-                    }
-                }
-
                 TezEdgeDescriptor edge = 
pred.outEdges.remove(splittee.getOperatorKey());
                 if (edge == null) {
                     throw new VisitorException("Edge description is empty");
                 }
-                pred.outEdges.put(splitter.getOperatorKey(), edge);
-                splitter.inEdges.put(pred.getOperatorKey(), edge);
                 plan.disconnect(pred, splittee);
-                plan.connect(pred, splitter);
+                TezCompilerUtil.connectTezOpToNewSuccesor(plan, pred, 
splitter, edge, spliteeKey);
             }
         }
 
@@ -244,27 +228,10 @@ public class MultiQueryOptimizerTez exte
 
                 // Do not connect again in case of self join/cross/cogroup or 
union
                 if (splitterSuccs == null || 
!splitterSuccs.contains(succTezOperator)) {
-                    TezCompilerUtil.connect(plan, splitter, succTezOperator, 
edge);
+                    TezCompilerUtil.connectTezOpToNewPredecessor(plan, 
succTezOperator, splitter, edge, null);
                 }
 
-                try {
-                    List<TezInput> inputs = 
PlanHelper.getPhysicalOperators(succTezOperator.plan, TezInput.class);
-                    for (TezInput input : inputs) {
-                        input.replaceInput(spliteeKey,
-                                splitterKey);
-                    }
-                    List<POUserFunc> userFuncs = 
PlanHelper.getPhysicalOperators(succTezOperator.plan, POUserFunc.class);
-                    for (POUserFunc userFunc : userFuncs) {
-                        if (userFunc.getFunc() instanceof ReadScalarsTez) {
-                            TezInput tezInput = (TezInput)userFunc.getFunc();
-                            tezInput.replaceInput(spliteeKey,
-                                    splitterKey);
-                            
userFunc.getFuncSpec().setCtorArgs(tezInput.getTezInputs());
-                        }
-                    }
-                } catch (VisitorException e) {
-                    throw new PlanException(e);
-                }
+                TezCompilerUtil.replaceInput(succTezOperator, spliteeKey, 
splitterKey);
 
                 if (succTezOperator.isUnion()) {
                     int index = 
succTezOperator.getUnionMembers().indexOf(splittee.getOperatorKey());

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1700912&r1=1700911&r2=1700912&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 Wed Sep  2 21:56:58 2015
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.commons.lang.ArrayUtils;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -43,6 +42,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import org.apache.pig.builtin.RoundRobinPartitioner;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
@@ -274,14 +274,9 @@ public class UnionOptimizer extends TezO
                 }
 
                 for (TezOperator actualPred : actualPreds) {
-                    List<TezOutput> tezOutputs = 
PlanHelper.getPhysicalOperators(actualPred.plan,
-                                    TezOutput.class);
 
-                    for (TezOutput tezOut : tezOutputs) {
-                        if (ArrayUtils.contains(tezOut.getTezOutputs(), 
unionOpKey)) {
-                            tezOut.replaceOutput(unionOpKey, 
splitPredKey.toString());
-                        }
-                    }
+                    TezCompilerUtil.replaceOutput(actualPred, unionOpKey, 
splitPredKey.toString());
+
                     TezEdgeDescriptor edge = 
actualPred.outEdges.remove(unionOp.getOperatorKey());
                     if (edge == null) {
                         throw new VisitorException("Edge description is 
empty");
@@ -343,27 +338,8 @@ public class UnionOptimizer extends TezO
                 }
 
                 for (TezOperator actualSucc : actualSuccs) {
-                    LinkedList<TezInput> inputs = 
PlanHelper.getPhysicalOperators(succ.plan, TezInput.class);
-                    for (TezInput tezInput : inputs) {
-                        for (String inputKey : tezInput.getTezInputs()) {
-                            if (inputKey.equals(unionOpKey)) {
-                                tezInput.replaceInput(inputKey, 
splitPredOpKey);
-                            }
-                        }
-                    }
 
-                    List<POUserFunc> userFuncs = 
PlanHelper.getPhysicalOperators(succ.plan, POUserFunc.class);
-                    for (POUserFunc userFunc : userFuncs) {
-                        if (userFunc.getFunc() instanceof ReadScalarsTez) {
-                            TezInput tezInput = (TezInput)userFunc.getFunc();
-                            for (String inputKey : tezInput.getTezInputs()) {
-                                if (inputKey.equals(unionOpKey)) {
-                                    tezInput.replaceInput(inputKey, 
splitPredOpKey);
-                                    
userFunc.getFuncSpec().setCtorArgs(tezInput.getTezInputs());
-                                }
-                            }
-                        }
-                    }
+                    TezCompilerUtil.replaceInput(succ, unionOpKey, 
splitPredOpKey);
 
                     TezEdgeDescriptor edge = 
actualSucc.inEdges.remove(unionOp.getOperatorKey());
                     if (edge == null) {

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1700912&r1=1700911&r2=1700912&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
 Wed Sep  2 21:56:58 2015
@@ -22,10 +22,12 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -37,6 +39,9 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
 import org.apache.pig.builtin.RoundRobinPartitioner;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.TupleFactory;
@@ -123,19 +128,73 @@ public class TezCompilerUtil {
 
     static public void connect(TezOperPlan plan, TezOperator from, TezOperator 
to, TezEdgeDescriptor edge) throws PlanException {
         plan.connect(from, to);
-        if (from.plan.getLeaves()!=null && !from.plan.getLeaves().isEmpty()) {
-            PhysicalOperator leaf = from.plan.getLeaves().get(0);
-            // It could be POStoreTez incase of sampling job in order by
-            if (leaf instanceof POLocalRearrangeTez) {
-                POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf;
-                lr.setOutputKey(to.getOperatorKey().toString());
-            }
-        }
+
         // Add edge descriptors to old and new operators
         to.inEdges.put(from.getOperatorKey(), edge);
         from.outEdges.put(to.getOperatorKey(), edge);
     }
 
+    static public void connectTezOpToNewPredecessor(TezOperPlan plan,
+            TezOperator tezOp, TezOperator newPredecessor,
+            TezEdgeDescriptor edge, String oldInputKey) throws PlanException {
+        plan.connect(newPredecessor, tezOp);
+        // Add edge descriptors to old and new operators
+        tezOp.inEdges.put(newPredecessor.getOperatorKey(), edge);
+        newPredecessor.outEdges.put(tezOp.getOperatorKey(), edge);
+
+        if (oldInputKey != null) {
+            replaceInput(tezOp, oldInputKey, 
newPredecessor.getOperatorKey().toString());
+        }
+    }
+
+    public static void replaceInput(TezOperator tezOp, String oldInputKey,
+            String newInputKey) throws PlanException {
+        try {
+            List<TezInput> inputs = 
PlanHelper.getPhysicalOperators(tezOp.plan, TezInput.class);
+            for (TezInput input : inputs) {
+                input.replaceInput(oldInputKey, newInputKey);
+            }
+            List<POUserFunc> userFuncs = 
PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class);
+            for (POUserFunc userFunc : userFuncs) {
+                if (userFunc.getFunc() instanceof ReadScalarsTez) {
+                    TezInput input = (TezInput)userFunc.getFunc();
+                    input.replaceInput(oldInputKey, newInputKey);
+                    userFunc.getFuncSpec().setCtorArgs(input.getTezInputs());
+                }
+            }
+        } catch (VisitorException e) {
+            throw new PlanException(e);
+        }
+    }
+
+    static public void connectTezOpToNewSuccesor(TezOperPlan plan,
+            TezOperator tezOp, TezOperator newSuccessor,
+            TezEdgeDescriptor edge, String oldOutputKey) throws PlanException {
+        plan.connect(tezOp, newSuccessor);
+        // Add edge descriptors to old and new operators
+        newSuccessor.inEdges.put(tezOp.getOperatorKey(), edge);
+        tezOp.outEdges.put(newSuccessor.getOperatorKey(), edge);
+
+        if (oldOutputKey != null) {
+            replaceOutput(tezOp, oldOutputKey, 
newSuccessor.getOperatorKey().toString());
+        }
+    }
+
+    public static void replaceOutput(TezOperator tezOp, String oldOutputKey,
+            String newOutputKey) throws PlanException {
+        try {
+            List<TezOutput> tezOutputs = 
PlanHelper.getPhysicalOperators(tezOp.plan,
+                    TezOutput.class);
+            for (TezOutput tezOut : tezOutputs) {
+                if (ArrayUtils.contains(tezOut.getTezOutputs(), oldOutputKey)) 
{
+                    tezOut.replaceOutput(oldOutputKey, newOutputKey);
+                }
+            }
+        } catch (VisitorException e) {
+            throw new PlanException(e);
+        }
+    }
+
     static public POForEach getForEach(POProject project, int rp, String 
scope, NodeIdGenerator nig) {
         PhysicalPlan forEachPlan = new PhysicalPlan();
         forEachPlan.add(project);

Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1700912&r1=1700911&r2=1700912&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Wed Sep  2 
21:56:58 2015
@@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
@@ -49,7 +48,6 @@ import org.apache.pig.data.Tuple;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -835,7 +833,6 @@ public class TestHBaseStorage {
      */
     @Test
     public void testMergeJoin() throws IOException {
-        Assume.assumeTrue("Skip this test for TEZ. See PIG-4315", 
pig.getPigContext().getExecType().equals(ExecType.LOCAL));
         prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
         prepareTable(TESTTABLE_2, true, DataFormat.HBaseBinary);
         pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using "
@@ -1078,7 +1075,7 @@ public class TestHBaseStorage {
             long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
             long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
             long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
-            
+
             Assert.assertEquals(timestamp, col_a_ts);
             Assert.assertEquals(timestamp, col_b_ts);
             Assert.assertEquals(timestamp, col_c_ts);
@@ -1125,7 +1122,7 @@ public class TestHBaseStorage {
             long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
             long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
             long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
-            
+
             Assert.assertEquals(timestamp, col_a_ts);
             Assert.assertEquals(timestamp, col_b_ts);
             Assert.assertEquals(timestamp, col_c_ts);
@@ -1172,7 +1169,7 @@ public class TestHBaseStorage {
             long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
             long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
             long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
-            
+
             Assert.assertEquals(timestamp, col_a_ts);
             Assert.assertEquals(timestamp, col_b_ts);
             Assert.assertEquals(timestamp, col_c_ts);
@@ -1217,7 +1214,7 @@ public class TestHBaseStorage {
             long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
             long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
             long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
-            
+
             Assert.assertEquals("00".substring(v.length()) + v, rowKey);
             Assert.assertEquals(i, col_a);
             Assert.assertEquals(i + 0.0, col_b, 1e-6);


Reply via email to