Author: xuefu
Date: Thu May  7 20:39:29 2015
New Revision: 1678261

URL: http://svn.apache.org/r1678261
Log:
PIG-4421: implement visitSkewedJoin in SparkCompiler (Liyun via Xuefu)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
    pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java
    pig/branches/spark/test/org/apache/pig/test/Util.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1678261&r1=1678260&r2=1678261&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Thu May  7 20:39:29 2015
@@ -535,10 +535,13 @@ public class SparkLauncher extends Launc
                        List<RDD<Tuple>> rddsFromPredeSparkOper,
                        Map<Class<? extends PhysicalOperator>, POConverter> 
convertMap)
                        throws IOException {
-               RDD<Tuple> nextRDD = null;
-               List<PhysicalOperator> predecessors = plan
-                               .getPredecessors(physicalOperator);
-               List<RDD<Tuple>> predecessorRdds = Lists.newArrayList();
+        RDD<Tuple> nextRDD = null;
+        List<PhysicalOperator> predecessors = plan
+                .getPredecessors(physicalOperator);
+        if (predecessors != null) {
+            Collections.sort(predecessors);
+        }
+        List<RDD<Tuple>> predecessorRdds = Lists.newArrayList();
                if (predecessors != null) {
                        for (PhysicalOperator predecessor : predecessors) {
                                physicalToRDD(plan, predecessor, rdds, 
rddsFromPredeSparkOper,

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1678261&r1=1678260&r2=1678261&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 Thu May  7 20:39:29 2015
@@ -600,10 +600,26 @@ public class SparkCompiler extends PhyPl
                }
        }
 
-       @Override
-       public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
-               // TODO
-       }
+    /**
+     * currently use regular join to replace skewedJoin
+     * Skewed join currently works with two-table inner join.
+     * More info about pig SkewedJoin, See 
https://wiki.apache.org/pig/PigSkewedJoinSpec
+     *
+     * @param op
+     * @throws VisitorException
+     */
+    @Override
+    public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
+        try {
+            addToPlan(op);
+            phyToSparkOpMap.put(op, curSparkOp);
+        } catch (Exception e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator " +
+                    op.getClass().getSimpleName();
+            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
+        }
+    }
 
        @Override
        public void visitFRJoin(POFRJoin op) throws VisitorException {

Modified: pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java?rev=1678261&r1=1678260&r2=1678261&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java Thu May  7 
20:39:29 2015
@@ -284,6 +284,13 @@ public class TestSkewedJoin {
 
     @Test
     public void testSkewedJoinKeyPartition() throws IOException {
+        // This test relies on how the keys are distributed in Skew Join 
implementation.
+        // Spark engine currently implements skew join as regular join, and 
hence does
+        // not control key distribution.
+        // TODO: Enable this test when Spark engine implements Skew Join 
algorithm.
+        if (Util.isSparkExecType(cluster.getExecType()))
+            return;
+
         String outputDir = "testSkewedJoinKeyPartition";
         try{
              Util.deleteFile(cluster, outputDir);

Modified: pig/branches/spark/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/Util.java?rev=1678261&r1=1678260&r2=1678261&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/Util.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/Util.java Thu May  7 20:39:29 
2015
@@ -1402,4 +1402,12 @@ public class Util {
         });
         return parts[0]; 
     }
+
+    public static boolean isSparkExecType(ExecType execType) {
+        if (execType.name().toLowerCase().startsWith("spark")) {
+            return true;
+        }
+
+        return false;
+    }
 }


Reply via email to