Author: praveen
Date: Thu Jan 22 12:10:38 2015
New Revision: 1653820

URL: http://svn.apache.org/r1653820
Log:
PIG-4362: Make ship work with spark (liyunzhang via praveen)

Added:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java
Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java
    pig/branches/spark/test/e2e/pig/tests/streaming.conf

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=1653820&r1=1653819&r2=1653820&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
 Thu Jan 22 12:10:38 2015
@@ -40,13 +40,14 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.Tuple;
 
 public class POStream extends PhysicalOperator {
+
     private static final long serialVersionUID = 2L;
     
-    private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP, 
null);
+    protected static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP, 
null);
 
-    private String executableManagerStr;            // String representing 
ExecutableManager to use
+    protected String executableManagerStr;            // String representing 
ExecutableManager to use
     transient private ExecutableManager executableManager;    // 
ExecutableManager to use 
-    private StreamingCommand command;               // Actual command to be run
+    protected StreamingCommand command;               // Actual command to be 
run
     private Properties properties;
 
     private boolean initialized = false;
@@ -392,4 +393,11 @@ public class POStream extends PhysicalOp
         this.isFetchable = isFetchable;
     }
 
+    public POStream(POStream copy){
+        super(copy);
+    }
+
+    public String getExecutableManagerStr() {
+        return executableManagerStr;
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1653820&r1=1653819&r2=1653820&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Thu Jan 22 12:10:38 2015
@@ -1,17 +1,24 @@
 package org.apache.pig.backend.hadoop.executionengine.spark;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import com.google.common.collect.Lists;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
@@ -38,6 +45,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter;
@@ -56,11 +64,13 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.SplitConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POStreamSpark;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.SparkStats;
 import org.apache.spark.rdd.RDD;
@@ -82,7 +92,8 @@ public class SparkLauncher extends Launc
     private static SparkContext sparkContext = null;
 
     public static BroadCastServer bcaster;
-
+    private static final Matcher DISTRIBUTED_CACHE_ARCHIVE_MATCHER = Pattern
+            .compile("\\.(zip|tgz|tar\\.gz|tar)$").matcher("");
     // An object that handle cache calls in the operator graph. This is again
     // static because we want
     // it to be shared across SparkLaunchers. It gets cleared whenever we close
@@ -99,9 +110,6 @@ public class SparkLauncher extends Launc
 
         SchemaTupleBackend.initialize(c, pigContext);
 
-//        ObjectSerializer.serialize(c);
-        byte[] confBytes = KryoSerializer.serializeJobConf(c);
-
         // Code pulled from MapReduceLauncher
         MRCompiler mrCompiler = new MRCompiler(physicalPlan, pigContext);
         mrCompiler.compile();
@@ -121,7 +129,17 @@ public class SparkLauncher extends Launc
         }
 
         startSparkIfNeeded();
+        String currentDirectoryPath = 
Paths.get(".").toAbsolutePath().normalize().toString() + "/";
+        startSparkJob(pigContext,currentDirectoryPath);
+        LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
+                physicalPlan, POStore.class);
+        POStore firstStore = stores.getFirst();
+        if( firstStore != null ){
+            MapRedUtil.setupStreamingDirsConfSingle(firstStore, pigContext, c);
+        }
 
+        //        ObjectSerializer.serialize(c);
+        byte[] confBytes = KryoSerializer.serializeJobConf(c);
         // initialize the supported converters
         Map<Class<? extends PhysicalOperator>, POConverter> convertMap = new 
HashMap<Class<? extends PhysicalOperator>, POConverter>();
 
@@ -144,21 +162,130 @@ public class SparkLauncher extends Launc
         convertMap.put(POCollectedGroup.class, new CollectedGroupConverter());
         convertMap.put(POCounter.class, new CounterConverter());
         convertMap.put(PORank.class, new RankConverter());
+        convertMap.put(POStreamSpark.class,new StreamConverter(confBytes));
 
         Map<OperatorKey, RDD<Tuple>> rdds = new HashMap<OperatorKey, 
RDD<Tuple>>();
 
         SparkStats stats = new SparkStats();
-        LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
-                physicalPlan, POStore.class);
+
         for (POStore poStore : stores) {
             physicalToRDD(physicalPlan, poStore, rdds, convertMap);
             stats.addOutputInfo(poStore, 1, 1, true, c); // TODO: use real
             // values
         }
-
+        cleanUpSparkJob(pigContext,currentDirectoryPath);
         return stats;
     }
 
+    private void cleanUpSparkJob(PigContext pigContext, String 
currentDirectoryPath) {
+        LOG.info("clean up Spark Job");
+        boolean isLocal = System.getenv("SPARK_MASTER").equals("local");
+        if (isLocal) {
+            String shipFiles = 
pigContext.getProperties().getProperty("pig.streaming.ship.files");
+            if (shipFiles != null) {
+                for (String file : shipFiles.split(",")) {
+                    File shipFile = new File(file);
+                    File deleteFile = new File(currentDirectoryPath + "/" + 
shipFile.getName());
+                    if (deleteFile.exists()) {
+                        LOG.info(String.format("delete ship file result: %b", 
deleteFile.delete()));
+                    }
+                }
+            }
+            String cacheFiles = 
pigContext.getProperties().getProperty("pig.streaming.cache.files");
+            if (cacheFiles != null) {
+                for (String file : cacheFiles.split(",")) {
+                    String fileName = extractFileName(file.trim());
+                    File deleteFile = new File(currentDirectoryPath + "/" + 
fileName);
+                    if (deleteFile.exists()) {
+                        LOG.info(String.format("delete cache file result: %b", 
deleteFile.delete()));
+                    }
+                }
+            }
+        }
+    }
+
+    private void startSparkJob(PigContext pigContext, String 
currentDirectoryPath) throws IOException {
+        LOG.info("start Spark Job");
+        String shipFiles = 
pigContext.getProperties().getProperty("pig.streaming.ship.files");
+        shipFiles(shipFiles, currentDirectoryPath);
+        String cacheFiles = 
pigContext.getProperties().getProperty("pig.streaming.cache.files");
+        cacheFiles(cacheFiles, currentDirectoryPath, pigContext);
+    }
+
+    private void shipFiles(String shipFiles, String currentDirectoryPath) 
throws IOException {
+        if (shipFiles != null) {
+            for (String file : shipFiles.split(",")) {
+                File shipFile = new File(file.trim());
+                if (shipFile.exists()) {
+                    LOG.info(String.format("shipFile:%s",shipFile));
+                    boolean isLocal = 
System.getenv("SPARK_MASTER").equals("local");
+                    if (isLocal) {
+                        File localFile = new File(currentDirectoryPath+"/" + 
shipFile.getName());
+                        if( localFile.exists()){
+                            LOG.info(String.format("ship file %s exists, ready 
to delete",localFile.getAbsolutePath()));
+                            localFile.delete();
+                        } else{
+                            LOG.info(String.format("ship file %s  not 
exists,",localFile.getAbsolutePath()));
+                        }
+                        Files.copy(shipFile.toPath(), 
Paths.get(localFile.getAbsolutePath()));
+                    } else {
+                        
sparkContext.addFile(shipFile.toURI().toURL().toExternalForm());
+                    }
+                }
+            }
+        }
+    }
+
+    private void cacheFiles(String cacheFiles, String currentDirectoryPath, 
PigContext pigContext) throws IOException {
+        if (cacheFiles != null) {
+            Configuration conf = SparkUtil.newJobConf(pigContext);
+            boolean isLocal = System.getenv("SPARK_MASTER").equals("local");
+            for (String file : cacheFiles.split(",")) {
+                String fileName = extractFileName(file.trim());
+                Path src = new Path(extractFileUrl(file.trim()));
+                File tmpFile = File.createTempFile(fileName,".tmp");
+                Path tmpFilePath = new Path(tmpFile.getAbsolutePath());
+                FileSystem fs = tmpFilePath.getFileSystem(conf);
+                fs.copyToLocalFile(src, tmpFilePath);
+                tmpFile.deleteOnExit();
+                if (isLocal) {
+                    File localFile = new File(currentDirectoryPath + "/" + 
fileName);
+                    if( localFile.exists()){
+                        LOG.info(String.format("cache file %s exists, ready to 
delete",localFile.getAbsolutePath()));
+                        localFile.delete();
+                    } else{
+                        LOG.info(String.format("cache file %s not 
exists,",localFile.getAbsolutePath()));
+                    }
+                    Files.copy( Paths.get(tmpFilePath.toString()), 
Paths.get(localFile.getAbsolutePath()));
+                } else {
+                    
sparkContext.addFile(tmpFile.toURI().toURL().toExternalForm());
+                }
+            }
+        }
+    }
+
+    private String extractFileName(String cacheFileUrl) {
+        String[] tmpAry = cacheFileUrl.split("#");
+        String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[1] : 
null;
+        if (fileName == null) {
+            throw new RuntimeException("cache file is invalid format, file:" + 
cacheFileUrl);
+        } else {
+            LOG.debug("cache file name is valid:" + cacheFileUrl);
+            return fileName;
+        }
+    }
+
+    private String extractFileUrl(String cacheFileUrl) {
+        String[] tmpAry = cacheFileUrl.split("#");
+        String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[0] : 
null;
+        if (fileName == null) {
+            throw new RuntimeException("cache file is invalid format, file:" + 
cacheFileUrl);
+        } else {
+            LOG.debug("cache file name is valid:" + cacheFileUrl);
+            return fileName;
+        }
+    }
+
     private static void startSparkIfNeeded() throws PigException {
         if (sparkContext == null) {
             String master = System.getenv("SPARK_MASTER");
@@ -249,6 +376,11 @@ public class SparkLauncher extends Launc
             }
         }
 
+        if( physicalOperator instanceof  POStream ){
+            POStream poStream = (POStream)physicalOperator;
+            physicalOperator = new POStreamSpark(poStream);
+        }
+
         POConverter converter = convertMap.get(physicalOperator.getClass());
         if (converter == null) {
             throw new IllegalArgumentException(

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java?rev=1653820&r1=1653819&r2=1653820&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java
 Thu Jan 22 12:10:38 2015
@@ -20,41 +20,42 @@ abstract class POOutputConsumerIterator
     abstract protected Result getNextResult() throws ExecException;
 
     private void readNext() {
-        try {
-            if (result != null && !returned) {
-                return;
-            }
-            // see PigGenericMapBase
-            if (result == null) {
-                if (!input.hasNext()) {
-                    finished = true;
+        while (true) {
+            try {
+                if (result != null && !returned) {
                     return;
                 }
-                Tuple v1 = input.next();
-                attach(v1);
-            }
-            result = getNextResult();
-            returned = false;
-            switch (result.returnStatus) {
-            case POStatus.STATUS_OK:
+                // see PigGenericMapBase
+                if (result == null) {
+                    if (!input.hasNext()) {
+                        finished = true;
+
+                        return;
+                    }
+                    Tuple v1 = input.next();
+                    attach(v1);
+                }
+                result = getNextResult();
                 returned = false;
-                break;
-            case POStatus.STATUS_NULL:
-                returned = true; // skip: see PigGenericMapBase
-                readNext();
-                break;
-            case POStatus.STATUS_EOP:
-                finished = !input.hasNext();
-                if (!finished) {
-                    result = null;
-                    readNext();
+                switch (result.returnStatus) {
+                    case POStatus.STATUS_OK:
+                        returned = false;
+                        break;
+                    case POStatus.STATUS_NULL:
+                        returned = true; // skip: see PigGenericMapBase
+                        break;
+                    case POStatus.STATUS_EOP:
+                        finished = !input.hasNext();
+                        if (!finished) {
+                            result = null;
+                        }
+                        break;
+                    case POStatus.STATUS_ERR:
+                        throw new RuntimeException("Error while processing " + 
result);
                 }
-                break;
-            case POStatus.STATUS_ERR:
-                throw new RuntimeException("Error while processing " + result);
+            } catch (ExecException e) {
+                throw new RuntimeException(e);
             }
-        } catch (ExecException e) {
-            throw new RuntimeException(e);
         }
     }
 

Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java?rev=1653820&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
 Thu Jan 22 12:10:38 2015
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POStreamSpark;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.SchemaTupleBackend;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+import scala.Function1;
+import scala.Tuple2;
+import scala.runtime.AbstractFunction1;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.pig.data.Tuple;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.SparkContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class StreamConverter implements
+               POConverter<Tuple, Tuple, POStreamSpark> {
+       private static Log LOG = LogFactory.getLog(StreamConverter.class);
+       private byte[] confBytes;
+
+       public StreamConverter(byte[] confBytes) {
+               this.confBytes = confBytes;
+       }
+
+       @Override
+       public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+                       POStreamSpark poStream) throws IOException {
+               SparkUtil.assertPredecessorSize(predecessors, poStream, 1);
+               RDD<Tuple> rdd = predecessors.get(0);
+               RDD<Tuple> rdd2 = rdd.coalesce(1, false, null);
+               long count = 0;
+               try {
+                       count = rdd2.count();
+               } catch (Exception e) {
+                       System.out.println("Crash in StreamConverter :" + e);
+                       LOG.info("Crash in StreamConverter ", e);
+               }
+               StreamFunction streamFunction = new StreamFunction(poStream, 
count,
+                               confBytes);
+               return rdd2.toJavaRDD().mapPartitions(streamFunction, 
true).rdd();
+       }
+
+       private static class StreamFunction implements
+                       FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
+               private POStreamSpark poStream;
+               private long total_limit;
+               private long current_val;
+               private boolean proceed = false;
+               private transient JobConf jobConf;
+               private byte[] confBytes;
+
+               private StreamFunction(POStreamSpark poStream, long total_limit,
+                               byte[] confBytes) {
+                       this.poStream = poStream;
+                       this.total_limit = total_limit;
+                       this.current_val = 0;
+                       this.confBytes = confBytes;
+               }
+
+               void initializeJobConf() {
+                       if (this.jobConf == null) {
+                               this.jobConf = KryoSerializer
+                                               
.deserializeJobConf(this.confBytes);
+                               PigMapReduce.sJobConfInternal.set(jobConf);
+                               try {
+                                       MapRedUtil.setupUDFContext(jobConf);
+                                       PigContext pc = (PigContext) 
ObjectSerializer
+                                                       
.deserialize(jobConf.get("pig.pigContext"));
+                                       SchemaTupleBackend.initialize(jobConf, 
pc);
+
+                               } catch (IOException ioe) {
+                                       String msg = "Problem while configuring 
UDFContext from ForEachConverter.";
+                                       throw new RuntimeException(msg, ioe);
+                               }
+                       }
+               }
+
+               public Iterable<Tuple> call(final Iterator<Tuple> input) {
+                       initializeJobConf();
+                       return new Iterable<Tuple>() {
+                               @Override
+                               public Iterator<Tuple> iterator() {
+                                       return new 
POOutputConsumerIterator(input) {
+
+                                               @Override
+                                               protected void attach(Tuple 
tuple) {
+                                                       
poStream.setInputs(null);
+                                                       
poStream.attachInput(tuple);
+                                                       try {
+                                                               current_val = 
current_val + 1;
+                                                               if (current_val 
== total_limit) {
+                                                                       proceed 
= true;
+                                                               } else {
+                                                                       proceed 
= false;
+                                                               }
+
+                                                       } catch (Exception e) {
+                                                               
System.out.println("Crash in StreamConverter :"
+                                                                               
+ e);
+                                                               LOG.info("Crash 
in StreamConverter ", e);
+                                                       }
+                                               }
+
+                                               @Override
+                                               protected Result 
getNextResult() throws ExecException {
+                                                       Result result = 
poStream.getNextTuple(proceed);
+                                                       return result;
+                                               }
+                                       };
+                               }
+                       };
+               }
+       }
+}

Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java?rev=1653820&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java
 Thu Jan 22 12:10:38 2015
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.data.Tuple;
+
+public class POStreamSpark extends POStream {
+       public POStreamSpark(POStream copy) {
+               super(copy);
+               this.command = copy.getCommand();
+               this.executableManagerStr = copy.getExecutableManagerStr();
+       }
+
+       public Result getNextTuple(boolean proceed) throws ExecException {
+               // The POStream Operator works with ExecutableManager to
+               // send input to the streaming binary and to get output
+               // from it. To achieve a tuple oriented behavior, two queues
+               // are used - one for output from the binary and one for
+               // input to the binary. In each getNext() call:
+               // 1) If there is no more output expected from the binary, an 
EOP is
+               // sent to successor
+               // 2) If there is any output from the binary in the queue, it 
is passed
+               // down to the successor
+               // 3) if neither of these two are true and if it is possible to
+               // send input to the binary, then the next tuple from the
+               // predecessor is got and passed to the binary
+               try {
+                       // if we are being called AFTER all output from the 
streaming
+                       // binary has already been sent to us then just return 
EOP
+                       // The "allOutputFromBinaryProcessed" flag is set when 
we see
+                       // an EOS (End of Stream output) from streaming binary
+                       if (allOutputFromBinaryProcessed) {
+                               return new Result(POStatus.STATUS_EOP, null);
+                       }
+
+                       // if we are here AFTER all map() calls have been 
completed
+                       // AND AFTER we process all possible input to be sent 
to the
+                       // streaming binary, then all we want to do is read 
output from
+                       // the streaming binary
+                       if (allInputFromPredecessorConsumed) {
+                               Result r = binaryOutputQueue.take();
+                               if (r.returnStatus == POStatus.STATUS_EOS) {
+                                       // If we received EOS, it means all 
output
+                                       // from the streaming binary has been 
sent to us
+                                       // So we can send an EOP to the 
successor in
+                                       // the pipeline. Also since we are 
being called
+                                       // after all input from predecessor has 
been processed
+                                       // it means we got here from a call 
from close() in
+                                       // map or reduce. So once we send this 
EOP down,
+                                       // getNext() in POStream should never 
be called. So
+                                       // we don't need to set any flag noting 
we saw all output
+                                       // from binary
+                                       r = EOP_RESULT;
+                               } else if (r.returnStatus == POStatus.STATUS_OK)
+                                       illustratorMarkup(r.result, r.result, 
0);
+                               return (r);
+                       }
+
+                       // if we are here, we haven't consumed all input to be 
sent
+                       // to the streaming binary - check if we are being 
called
+                       // from close() on the map or reduce
+                       Result r = getNextHelper((Tuple) null);
+                       if (isFetchable() || proceed) {
+                               if (r.returnStatus == POStatus.STATUS_EOP) {
+                                       // we have now seen *ALL* possible input
+                                       // check if we ever had any real input
+                                       // in the course of the map/reduce - if 
we did
+                                       // then "initialized" will be true. If 
not, just
+                                       // send EOP down.
+                                       if (getInitialized()) {
+                                               // signal End of ALL input to 
the Executable Manager's
+                                               // Input handler thread
+                                               binaryInputQueue.put(r);
+                                               // note this state for future 
calls
+                                               allInputFromPredecessorConsumed 
= true;
+                                               // look for output from binary
+                                               r = binaryOutputQueue.take();
+                                               if (r.returnStatus == 
POStatus.STATUS_EOS) {
+                                                       // If we received EOS, 
it means all output
+                                                       // from the streaming 
binary has been sent to us
+                                                       // So we can send an 
EOP to the successor in
+                                                       // the pipeline. Also 
since we are being called
+                                                       // after all input from 
predecessor has been
+                                                       // processed
+                                                       // it means we got here 
from a call from close() in
+                                                       // map or reduce. So 
once we send this EOP down,
+                                                       // getNext() in 
POStream should never be called. So
+                                                       // we don't need to set 
any flag noting we saw all
+                                                       // output
+                                                       // from binary
+                                                       r = EOP_RESULT;
+                                               }
+                                       }
+
+                               } else if (r.returnStatus == 
POStatus.STATUS_EOS) {
+                                       // If we received EOS, it means all 
output
+                                       // from the streaming binary has been 
sent to us
+                                       // So we can send an EOP to the 
successor in
+                                       // the pipeline. Also we are being 
called
+                                       // from close() in map or reduce (this 
is so because
+                                       // only then 
this.parentPlan.endOfAllInput is true).
+                                       // So once we send this EOP down, 
getNext() in POStream
+                                       // should never be called. So we don't 
need to set any
+                                       // flag noting we saw all output from 
binary
+                                       r = EOP_RESULT;
+                               } else if (r.returnStatus == POStatus.STATUS_OK)
+                                       illustratorMarkup(r.result, r.result, 
0);
+                               return r;
+                       } else {
+                               // we are not being called from close() - so
+                               // we must be called from either map() or 
reduce()
+                               // get the next Result from helper
+                               if (r.returnStatus == POStatus.STATUS_EOS) {
+                                       // If we received EOS, it means all 
output
+                                       // from the streaming binary has been 
sent to us
+                                       // So we can send an EOP to the 
successor in
+                                       // the pipeline and also note this 
condition
+                                       // for future calls
+                                       r = EOP_RESULT;
+                                       allOutputFromBinaryProcessed = true;
+                               } else if (r.returnStatus == POStatus.STATUS_OK)
+                                       illustratorMarkup(r.result, r.result, 
0);
+                               return r;
+                       }
+
+               } catch (Exception e) {
+                       int errCode = 2083;
+                       String msg = "Error while trying to get next result in 
POStream.";
+                       throw new ExecException(msg, errCode, PigException.BUG, 
e);
+               }
+       }
+}

Modified: pig/branches/spark/test/e2e/pig/tests/streaming.conf
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/streaming.conf?rev=1653820&r1=1653819&r2=1653820&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/streaming.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/streaming.conf Thu Jan 22 12:10:38 
2015
@@ -79,7 +79,7 @@ store C into ':OUTPATH:';#,
                        {
                        #Section 1.1: perl script, no parameters, 
autoship(Section 2.1)
                         'num' => 4,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 A = load ':INPATH:/singlefile/studenttab10k';
 B = foreach A generate $0, $1, $2;
@@ -90,7 +90,7 @@ store C into ':OUTPATH:';#,
                        {
                        # Section 1.2: perl script that takes parameters; 
explicit ship of script (Section 2.1)
                         'num' => 5,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreaming.pl - -` ship(':SCRIPTHOMEPATH:/PigStreaming.pl') 
stderr('CMD' limit 1);
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -102,7 +102,7 @@ store C into ':OUTPATH:';#,
                        {
                        # Section 1.3: define clause; explicit ship of script 
(Section 2.1)
                         'num' => 6,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreaming.pl` ship(':SCRIPTHOMEPATH:/PigStreaming.pl') 
stderr('CMD');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -115,7 +115,7 @@ store D into ':OUTPATH:';#,
                        {
                        # Section 1.4: grouped data
                         'num' => 7,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -128,7 +128,7 @@ store D into ':OUTPATH:';#,
                        {
                        # Section 1.4: grouped and ordered data
                         'num' => 8,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl GroupBy.pl '\t' 0 1` ship(':SCRIPTHOMEPATH:/GroupBy.pl');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -144,7 +144,7 @@ store E into ':OUTPATH:';#,
                        {
                        # Section 1.5: multiple streaming operators - adjacent 
- map side
                         'num' => 9,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreamingDepend.pl` input(stdin) 
ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', 
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -157,7 +157,7 @@ store D into ':OUTPATH:';#,
                        {
                        # Section 1.5: multiple streaming operators - not 
adjacent - map side
                         'num' => 10,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 A = load ':INPATH:/singlefile/studenttab10k';
 define CMD `perl PigStreamingDepend.pl` input(stdin) 
ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', 
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
@@ -172,7 +172,7 @@ store E into ':OUTPATH:';#,
                        {
                        # Section 1.5: multiple streaming operators - adjacent 
- reduce side
                         'num' => 11,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD1 `perl GroupBy.pl '\t' 0 1` ship(':SCRIPTHOMEPATH:/GroupBy.pl') 
stderr('CMD1');
 define CMD2 `perl PigStreamingDepend.pl` input(stdin) 
ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', 
':SCRIPTHOMEPATH:/PigStreamingModule.pm') stderr('CMD2');
@@ -191,7 +191,7 @@ store F into ':OUTPATH:';#,
                        # Section 1.5: multiple streaming operators - one on 
map and one on reduce side
                        # same alias name
                         'num' => 12,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD1 `perl GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl');
 define CMD2 `perl PigStreamingDepend.pl` input(stdin) 
ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', 
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
@@ -206,7 +206,7 @@ store B into ':OUTPATH:';#,
                        {
                        # Section 1.5: multiple streaming operators - adjacent 
- map side
                         'num' => 13,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreamingDepend.pl` input(stdin) 
ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', 
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -232,7 +232,7 @@ store D into ':OUTPATH:';#,
                         # Section 2.1: perl script and its dependency shipped
                        # Also covers part of section 3.1: custom serializer
                        'num' => 1,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreamingDepend.pl` input(stdin) 
ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', 
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -243,7 +243,7 @@ store B into ':OUTPATH:';#,
                        {
                        # Section 2.1: perl script and supported data file is 
shipped
                        'num' => 2,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreaming.pl - - nameMap` 
ship(':SCRIPTHOMEPATH:/PigStreaming.pl', ':SCRIPTHOMEPATH:/nameMap');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -257,7 +257,7 @@ store E into ':OUTPATH:';#,
                        {
                        # Section 2.2: script is shipped while the supporting 
file is cached
                        'num' => 3,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q@
 define CMD `perl PigStreaming.pl - - nameMap` 
ship(':SCRIPTHOMEPATH:/PigStreaming.pl') 
cache(':INPATH:/nameMap/part-00000#nameMap');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -283,7 +283,7 @@ store E into ':OUTPATH:';@,
                        {
                        # Section 3.1: use of custom deserializer
                        'num' => 1,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreaming.pl` output(stdout) 
ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -294,7 +294,7 @@ store B into ':OUTPATH:';#,
                        {
                        # Section 3.1: use of custom serializer and deserializer
                        'num' => 2,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 register :FUNCPATH:/testudf.jar;
 define CMD `perl PigStreaming.pl` input(stdin using 
org.apache.pig.test.udf.streaming.StreamingDump) output(stdout using 
org.apache.pig.test.udf.streaming.DumpStreamer) 
ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
@@ -307,7 +307,7 @@ store C into ':OUTPATH:';#,
                        {
                        # Section 3.3: streaming application reads from file 
rather than stdin
                        'num' => 3,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreaming.pl foo -` input('foo') 
ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -318,7 +318,7 @@ store B into ':OUTPATH:';#,
                        {
                        # Section 3.4: streaming application writes single 
output to a file
                        'num' => 4,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreaming.pl - foo nameMap` output('foo') 
ship(':SCRIPTHOMEPATH:/PigStreaming.pl', ':SCRIPTHOMEPATH:/nameMap');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -330,7 +330,7 @@ store C into ':OUTPATH:';#,
                        {
                        # Section 3.4: streaming application writes multiple 
outputs to file
                        'num' => 5,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreamingDepend.pl - sio_5_1 sio_5_2` input(stdin) 
output('sio_5_1', 'sio_5_2') ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', 
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -341,7 +341,7 @@ store B into ':OUTPATH:';#,
                        {
                        # Section 3.4: streaming application writes multiple 
outputs: 1 to file and 1 to stdout
                        'num' => 6,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreamingDepend.pl - - sio_5_2` input(stdin) 
output(stdout, 'sio_5_2') ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl', 
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -362,7 +362,7 @@ store B into ':OUTPATH:';#,
                        {
                        # Section 4.3: integration with parameter substitition
                        'num' => 1,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig_params' => ['-p', 
qq(script_name='PigStreaming.pl')],
                        'pig' => q#
 define CMD `perl $script_name - - nameMap` 
ship(':SCRIPTHOMEPATH:/$script_name', ':SCRIPTHOMEPATH:/nameMap');
@@ -387,7 +387,7 @@ store E into ':OUTPATH:';#,
                        {
                        # Section 5.1: load/store optimization
                        'num' => 1,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD `perl PigStreaming.pl` ship(':SCRIPTHOMEPATH:/PigStreaming.pl') 
stderr('CMD');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -422,7 +422,7 @@ store D into ':OUTPATH:';#,
                        {
                        # PIG-272: problem with optimization and intermediate 
store
                        'num' => 3,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD1 `perl -ne 'print $_;print STDERR "stderr $_";'`; 
 define CMD2 `Split.pl 3` input(stdin using PigStreaming(',')) 
ship(':SCRIPTHOMEPATH:/Split.pl'); 
@@ -444,7 +444,7 @@ store D into ':OUTPATH:';#,
                        {
                        # PIG-272: problem with optimization and intermediate 
store
                        'num' => 4,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 define CMD1 `perl -ne 'print $_;'`; 
 define CMD2 `Split.pl 3` input(stdin using PigStreaming(',')) 
ship(':SCRIPTHOMEPATH:/Split.pl'); 
@@ -472,7 +472,7 @@ store E into ':OUTPATH:';#,
                        # Make sure join with stream optimization works
             # optimization only on load side
                        'num' => 5,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 A = load ':INPATH:/singlefile/studenttab10k';
 B = stream A through `cat` as (name:chararray, age:int, gpa:double);
@@ -485,7 +485,7 @@ store D into ':OUTPATH:';#,
                        # Make sure join with stream optimization works
             # optimization only on store side 
                        'num' => 6,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 A = load ':INPATH:/singlefile/studenttab10k';
 B = filter A by $1 > 25;
@@ -500,7 +500,7 @@ store D into ':OUTPATH:';#,
                        # Make sure join with stream optimization works
             # optimization on load and store
                        'num' => 7,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                        'pig' => q#
 A = load ':INPATH:/singlefile/studenttab10k';
 B = stream A through `cat` as (name:chararray, age:int, gpa:double);
@@ -536,7 +536,7 @@ store B into ':OUTPATH:';#,
                        # case where binary finishes normally
                 # BEFORE all input has been passed to it
                         'num' => 2,
-                        'execonly' => 'mapred,tez',
+                        'execonly' => 'mapred,tez,spark',
                         'pig' => q#
 define CMD `perl DieRandomly.pl 10000 0` 
ship(':SCRIPTHOMEPATH:/DieRandomly.pl');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -565,7 +565,7 @@ store D into ':OUTPATH:';#,
                 # BEFORE all input has been passed to it
                 # FIXME: in local mode
                         'num' => 4,
-                       'execonly' => 'mapred,tez',
+                       'execonly' => 'mapred,tez,spark',
                         'pig' => q#
 A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 B = stream A through `head -1` as (name, age, gpa);
@@ -581,7 +581,7 @@ store E into ':OUTPATH:';#,
                 # BEFORE all input has been passed to it
                 # and emits no output
                         'num' => 5,
-                        'execonly' => 'mapred,tez',
+                        'execonly' => 'mapred,tez,spark',
                         'pig' => q#
 define CMD `perl DieRandomly.pl 10000 0` 
ship(':SCRIPTHOMEPATH:/DieRandomly.pl');
 A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
@@ -598,7 +598,7 @@ store D into ':OUTPATH:';#,
                 # BEFORE all input has been passed to it
                 # and emits no output
                         'num' => 6,
-                        'execonly' => 'mapred,tez',
+                        'execonly' => 'mapred,tez,spark',
                         'pig' => q#
 define CMD `perl DieRandomly.pl 10000 0` 
ship(':SCRIPTHOMEPATH:/DieRandomly.pl');
 A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
@@ -613,7 +613,7 @@ store E into ':OUTPATH:';#,
                 # two stream operators one after another where first
                 # one emits no output
                         'num' => 7,
-                        'execonly' => 'mapred,tez',
+                        'execonly' => 'mapred,tez,spark',
                         'pig' => q#
 define CMD `perl DieRandomly.pl 10000 0` 
ship(':SCRIPTHOMEPATH:/DieRandomly.pl');
 A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);


Reply via email to