Author: praveen
Date: Thu Jan 22 12:10:38 2015
New Revision: 1653820
URL: http://svn.apache.org/r1653820
Log:
PIG-4362: Make ship work with spark (liyunzhang via praveen)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java
pig/branches/spark/test/e2e/pig/tests/streaming.conf
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=1653820&r1=1653819&r2=1653820&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
Thu Jan 22 12:10:38 2015
@@ -40,13 +40,14 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.data.Tuple;
public class POStream extends PhysicalOperator {
+
private static final long serialVersionUID = 2L;
- private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP,
null);
+ protected static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP,
null);
- private String executableManagerStr; // String representing
ExecutableManager to use
+ protected String executableManagerStr; // String representing
ExecutableManager to use
transient private ExecutableManager executableManager; //
ExecutableManager to use
- private StreamingCommand command; // Actual command to be run
+ protected StreamingCommand command; // Actual command to be
run
private Properties properties;
private boolean initialized = false;
@@ -392,4 +393,11 @@ public class POStream extends PhysicalOp
this.isFetchable = isFetchable;
}
+ public POStream(POStream copy){
+ super(copy);
+ }
+
+ public String getExecutableManagerStr() {
+ return executableManagerStr;
+ }
}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1653820&r1=1653819&r2=1653820&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Thu Jan 22 12:10:38 2015
@@ -1,17 +1,24 @@
package org.apache.pig.backend.hadoop.executionengine.spark;
+import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.PigConstants;
import org.apache.pig.PigException;
@@ -38,6 +45,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter;
@@ -56,11 +64,13 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.SplitConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
+import
org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
+import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POStreamSpark;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.SparkStats;
import org.apache.spark.rdd.RDD;
@@ -82,7 +92,8 @@ public class SparkLauncher extends Launc
private static SparkContext sparkContext = null;
public static BroadCastServer bcaster;
-
+ private static final Matcher DISTRIBUTED_CACHE_ARCHIVE_MATCHER = Pattern
+ .compile("\\.(zip|tgz|tar\\.gz|tar)$").matcher("");
// An object that handle cache calls in the operator graph. This is again
// static because we want
// it to be shared across SparkLaunchers. It gets cleared whenever we close
@@ -99,9 +110,6 @@ public class SparkLauncher extends Launc
SchemaTupleBackend.initialize(c, pigContext);
-// ObjectSerializer.serialize(c);
- byte[] confBytes = KryoSerializer.serializeJobConf(c);
-
// Code pulled from MapReduceLauncher
MRCompiler mrCompiler = new MRCompiler(physicalPlan, pigContext);
mrCompiler.compile();
@@ -121,7 +129,17 @@ public class SparkLauncher extends Launc
}
startSparkIfNeeded();
+ String currentDirectoryPath =
Paths.get(".").toAbsolutePath().normalize().toString() + "/";
+ startSparkJob(pigContext,currentDirectoryPath);
+ LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
+ physicalPlan, POStore.class);
+ POStore firstStore = stores.getFirst();
+ if( firstStore != null ){
+ MapRedUtil.setupStreamingDirsConfSingle(firstStore, pigContext, c);
+ }
+ // ObjectSerializer.serialize(c);
+ byte[] confBytes = KryoSerializer.serializeJobConf(c);
// initialize the supported converters
Map<Class<? extends PhysicalOperator>, POConverter> convertMap = new
HashMap<Class<? extends PhysicalOperator>, POConverter>();
@@ -144,21 +162,130 @@ public class SparkLauncher extends Launc
convertMap.put(POCollectedGroup.class, new CollectedGroupConverter());
convertMap.put(POCounter.class, new CounterConverter());
convertMap.put(PORank.class, new RankConverter());
+ convertMap.put(POStreamSpark.class,new StreamConverter(confBytes));
Map<OperatorKey, RDD<Tuple>> rdds = new HashMap<OperatorKey,
RDD<Tuple>>();
SparkStats stats = new SparkStats();
- LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
- physicalPlan, POStore.class);
+
for (POStore poStore : stores) {
physicalToRDD(physicalPlan, poStore, rdds, convertMap);
stats.addOutputInfo(poStore, 1, 1, true, c); // TODO: use real
// values
}
-
+ cleanUpSparkJob(pigContext,currentDirectoryPath);
return stats;
}
+ private void cleanUpSparkJob(PigContext pigContext, String
currentDirectoryPath) {
+ LOG.info("clean up Spark Job");
+ boolean isLocal = System.getenv("SPARK_MASTER").equals("local");
+ if (isLocal) {
+ String shipFiles =
pigContext.getProperties().getProperty("pig.streaming.ship.files");
+ if (shipFiles != null) {
+ for (String file : shipFiles.split(",")) {
+ File shipFile = new File(file);
+ File deleteFile = new File(currentDirectoryPath + "/" +
shipFile.getName());
+ if (deleteFile.exists()) {
+ LOG.info(String.format("delete ship file result: %b",
deleteFile.delete()));
+ }
+ }
+ }
+ String cacheFiles =
pigContext.getProperties().getProperty("pig.streaming.cache.files");
+ if (cacheFiles != null) {
+ for (String file : cacheFiles.split(",")) {
+ String fileName = extractFileName(file.trim());
+ File deleteFile = new File(currentDirectoryPath + "/" +
fileName);
+ if (deleteFile.exists()) {
+ LOG.info(String.format("delete cache file result: %b",
deleteFile.delete()));
+ }
+ }
+ }
+ }
+ }
+
+ private void startSparkJob(PigContext pigContext, String
currentDirectoryPath) throws IOException {
+ LOG.info("start Spark Job");
+ String shipFiles =
pigContext.getProperties().getProperty("pig.streaming.ship.files");
+ shipFiles(shipFiles, currentDirectoryPath);
+ String cacheFiles =
pigContext.getProperties().getProperty("pig.streaming.cache.files");
+ cacheFiles(cacheFiles, currentDirectoryPath, pigContext);
+ }
+
+ private void shipFiles(String shipFiles, String currentDirectoryPath)
throws IOException {
+ if (shipFiles != null) {
+ for (String file : shipFiles.split(",")) {
+ File shipFile = new File(file.trim());
+ if (shipFile.exists()) {
+ LOG.info(String.format("shipFile:%s",shipFile));
+ boolean isLocal =
System.getenv("SPARK_MASTER").equals("local");
+ if (isLocal) {
+ File localFile = new File(currentDirectoryPath+"/" +
shipFile.getName());
+ if( localFile.exists()){
+ LOG.info(String.format("ship file %s exists, ready
to delete",localFile.getAbsolutePath()));
+ localFile.delete();
+ } else{
+ LOG.info(String.format("ship file %s not
exists,",localFile.getAbsolutePath()));
+ }
+ Files.copy(shipFile.toPath(),
Paths.get(localFile.getAbsolutePath()));
+ } else {
+
sparkContext.addFile(shipFile.toURI().toURL().toExternalForm());
+ }
+ }
+ }
+ }
+ }
+
+ private void cacheFiles(String cacheFiles, String currentDirectoryPath,
PigContext pigContext) throws IOException {
+ if (cacheFiles != null) {
+ Configuration conf = SparkUtil.newJobConf(pigContext);
+ boolean isLocal = System.getenv("SPARK_MASTER").equals("local");
+ for (String file : cacheFiles.split(",")) {
+ String fileName = extractFileName(file.trim());
+ Path src = new Path(extractFileUrl(file.trim()));
+ File tmpFile = File.createTempFile(fileName,".tmp");
+ Path tmpFilePath = new Path(tmpFile.getAbsolutePath());
+ FileSystem fs = tmpFilePath.getFileSystem(conf);
+ fs.copyToLocalFile(src, tmpFilePath);
+ tmpFile.deleteOnExit();
+ if (isLocal) {
+ File localFile = new File(currentDirectoryPath + "/" +
fileName);
+ if( localFile.exists()){
+ LOG.info(String.format("cache file %s exists, ready to
delete",localFile.getAbsolutePath()));
+ localFile.delete();
+ } else{
+ LOG.info(String.format("cache file %s not
exists,",localFile.getAbsolutePath()));
+ }
+ Files.copy( Paths.get(tmpFilePath.toString()),
Paths.get(localFile.getAbsolutePath()));
+ } else {
+
sparkContext.addFile(tmpFile.toURI().toURL().toExternalForm());
+ }
+ }
+ }
+ }
+
+ private String extractFileName(String cacheFileUrl) {
+ String[] tmpAry = cacheFileUrl.split("#");
+ String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[1] :
null;
+ if (fileName == null) {
+ throw new RuntimeException("cache file is invalid format, file:" +
cacheFileUrl);
+ } else {
+ LOG.debug("cache file name is valid:" + cacheFileUrl);
+ return fileName;
+ }
+ }
+
+ private String extractFileUrl(String cacheFileUrl) {
+ String[] tmpAry = cacheFileUrl.split("#");
+ String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[0] :
null;
+ if (fileName == null) {
+ throw new RuntimeException("cache file is invalid format, file:" +
cacheFileUrl);
+ } else {
+ LOG.debug("cache file name is valid:" + cacheFileUrl);
+ return fileName;
+ }
+ }
+
private static void startSparkIfNeeded() throws PigException {
if (sparkContext == null) {
String master = System.getenv("SPARK_MASTER");
@@ -249,6 +376,11 @@ public class SparkLauncher extends Launc
}
}
+ if( physicalOperator instanceof POStream ){
+ POStream poStream = (POStream)physicalOperator;
+ physicalOperator = new POStreamSpark(poStream);
+ }
+
POConverter converter = convertMap.get(physicalOperator.getClass());
if (converter == null) {
throw new IllegalArgumentException(
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java?rev=1653820&r1=1653819&r2=1653820&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java
Thu Jan 22 12:10:38 2015
@@ -20,41 +20,42 @@ abstract class POOutputConsumerIterator
abstract protected Result getNextResult() throws ExecException;
private void readNext() {
- try {
- if (result != null && !returned) {
- return;
- }
- // see PigGenericMapBase
- if (result == null) {
- if (!input.hasNext()) {
- finished = true;
+ while (true) {
+ try {
+ if (result != null && !returned) {
return;
}
- Tuple v1 = input.next();
- attach(v1);
- }
- result = getNextResult();
- returned = false;
- switch (result.returnStatus) {
- case POStatus.STATUS_OK:
+ // see PigGenericMapBase
+ if (result == null) {
+ if (!input.hasNext()) {
+ finished = true;
+
+ return;
+ }
+ Tuple v1 = input.next();
+ attach(v1);
+ }
+ result = getNextResult();
returned = false;
- break;
- case POStatus.STATUS_NULL:
- returned = true; // skip: see PigGenericMapBase
- readNext();
- break;
- case POStatus.STATUS_EOP:
- finished = !input.hasNext();
- if (!finished) {
- result = null;
- readNext();
+ switch (result.returnStatus) {
+ case POStatus.STATUS_OK:
+ returned = false;
+ break;
+ case POStatus.STATUS_NULL:
+ returned = true; // skip: see PigGenericMapBase
+ break;
+ case POStatus.STATUS_EOP:
+ finished = !input.hasNext();
+ if (!finished) {
+ result = null;
+ }
+ break;
+ case POStatus.STATUS_ERR:
+ throw new RuntimeException("Error while processing " +
result);
}
- break;
- case POStatus.STATUS_ERR:
- throw new RuntimeException("Error while processing " + result);
+ } catch (ExecException e) {
+ throw new RuntimeException(e);
}
- } catch (ExecException e) {
- throw new RuntimeException(e);
}
}
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java?rev=1653820&view=auto
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
(added)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
Thu Jan 22 12:10:38 2015
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.executionengine.ExecException;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POStreamSpark;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.SchemaTupleBackend;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+import scala.Function1;
+import scala.Tuple2;
+import scala.runtime.AbstractFunction1;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.pig.data.Tuple;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.SparkContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class StreamConverter implements
+ POConverter<Tuple, Tuple, POStreamSpark> {
+ private static Log LOG = LogFactory.getLog(StreamConverter.class);
+ private byte[] confBytes;
+
+ public StreamConverter(byte[] confBytes) {
+ this.confBytes = confBytes;
+ }
+
+ @Override
+ public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+ POStreamSpark poStream) throws IOException {
+ SparkUtil.assertPredecessorSize(predecessors, poStream, 1);
+ RDD<Tuple> rdd = predecessors.get(0);
+ RDD<Tuple> rdd2 = rdd.coalesce(1, false, null);
+ long count = 0;
+ try {
+ count = rdd2.count();
+ } catch (Exception e) {
+ System.out.println("Crash in StreamConverter :" + e);
+ LOG.info("Crash in StreamConverter ", e);
+ }
+ StreamFunction streamFunction = new StreamFunction(poStream,
count,
+ confBytes);
+ return rdd2.toJavaRDD().mapPartitions(streamFunction,
true).rdd();
+ }
+
+ private static class StreamFunction implements
+ FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable {
+ private POStreamSpark poStream;
+ private long total_limit;
+ private long current_val;
+ private boolean proceed = false;
+ private transient JobConf jobConf;
+ private byte[] confBytes;
+
+ private StreamFunction(POStreamSpark poStream, long total_limit,
+ byte[] confBytes) {
+ this.poStream = poStream;
+ this.total_limit = total_limit;
+ this.current_val = 0;
+ this.confBytes = confBytes;
+ }
+
+ void initializeJobConf() {
+ if (this.jobConf == null) {
+ this.jobConf = KryoSerializer
+
.deserializeJobConf(this.confBytes);
+ PigMapReduce.sJobConfInternal.set(jobConf);
+ try {
+ MapRedUtil.setupUDFContext(jobConf);
+ PigContext pc = (PigContext)
ObjectSerializer
+
.deserialize(jobConf.get("pig.pigContext"));
+ SchemaTupleBackend.initialize(jobConf,
pc);
+
+ } catch (IOException ioe) {
+ String msg = "Problem while configuring
UDFContext from ForEachConverter.";
+ throw new RuntimeException(msg, ioe);
+ }
+ }
+ }
+
+ public Iterable<Tuple> call(final Iterator<Tuple> input) {
+ initializeJobConf();
+ return new Iterable<Tuple>() {
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new
POOutputConsumerIterator(input) {
+
+ @Override
+ protected void attach(Tuple
tuple) {
+
poStream.setInputs(null);
+
poStream.attachInput(tuple);
+ try {
+ current_val =
current_val + 1;
+ if (current_val
== total_limit) {
+ proceed
= true;
+ } else {
+ proceed
= false;
+ }
+
+ } catch (Exception e) {
+
System.out.println("Crash in StreamConverter :"
+
+ e);
+ LOG.info("Crash
in StreamConverter ", e);
+ }
+ }
+
+ @Override
+ protected Result
getNextResult() throws ExecException {
+ Result result =
poStream.getNextTuple(proceed);
+ return result;
+ }
+ };
+ }
+ };
+ }
+ }
+}
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java?rev=1653820&view=auto
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java
(added)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java
Thu Jan 22 12:10:38 2015
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.data.Tuple;
+
+public class POStreamSpark extends POStream {
+ public POStreamSpark(POStream copy) {
+ super(copy);
+ this.command = copy.getCommand();
+ this.executableManagerStr = copy.getExecutableManagerStr();
+ }
+
+ public Result getNextTuple(boolean proceed) throws ExecException {
+ // The POStream Operator works with ExecutableManager to
+ // send input to the streaming binary and to get output
+ // from it. To achieve a tuple oriented behavior, two queues
+ // are used - one for output from the binary and one for
+ // input to the binary. In each getNext() call:
+ // 1) If there is no more output expected from the binary, an
EOP is
+ // sent to successor
+ // 2) If there is any output from the binary in the queue, it
is passed
+ // down to the successor
+ // 3) if neither of these two are true and if it is possible to
+ // send input to the binary, then the next tuple from the
+ // predecessor is got and passed to the binary
+ try {
+ // if we are being called AFTER all output from the
streaming
+ // binary has already been sent to us then just return
EOP
+ // The "allOutputFromBinaryProcessed" flag is set when
we see
+ // an EOS (End of Stream output) from streaming binary
+ if (allOutputFromBinaryProcessed) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+
+ // if we are here AFTER all map() calls have been
completed
+ // AND AFTER we process all possible input to be sent
to the
+ // streaming binary, then all we want to do is read
output from
+ // the streaming binary
+ if (allInputFromPredecessorConsumed) {
+ Result r = binaryOutputQueue.take();
+ if (r.returnStatus == POStatus.STATUS_EOS) {
+ // If we received EOS, it means all
output
+ // from the streaming binary has been
sent to us
+ // So we can send an EOP to the
successor in
+ // the pipeline. Also since we are
being called
+ // after all input from predecessor has
been processed
+ // it means we got here from a call
from close() in
+ // map or reduce. So once we send this
EOP down,
+ // getNext() in POStream should never
be called. So
+ // we don't need to set any flag noting
we saw all output
+ // from binary
+ r = EOP_RESULT;
+ } else if (r.returnStatus == POStatus.STATUS_OK)
+ illustratorMarkup(r.result, r.result,
0);
+ return (r);
+ }
+
+ // if we are here, we haven't consumed all input to be
sent
+ // to the streaming binary - check if we are being
called
+ // from close() on the map or reduce
+ Result r = getNextHelper((Tuple) null);
+ if (isFetchable() || proceed) {
+ if (r.returnStatus == POStatus.STATUS_EOP) {
+ // we have now seen *ALL* possible input
+ // check if we ever had any real input
+ // in the course of the map/reduce - if
we did
+ // then "initialized" will be true. If
not, just
+ // send EOP down.
+ if (getInitialized()) {
+ // signal End of ALL input to
the Executable Manager's
+ // Input handler thread
+ binaryInputQueue.put(r);
+ // note this state for future
calls
+ allInputFromPredecessorConsumed
= true;
+ // look for output from binary
+ r = binaryOutputQueue.take();
+ if (r.returnStatus ==
POStatus.STATUS_EOS) {
+ // If we received EOS,
it means all output
+ // from the streaming
binary has been sent to us
+ // So we can send an
EOP to the successor in
+ // the pipeline. Also
since we are being called
+ // after all input from
predecessor has been
+ // processed
+ // it means we got here
from a call from close() in
+ // map or reduce. So
once we send this EOP down,
+ // getNext() in
POStream should never be called. So
+ // we don't need to set
any flag noting we saw all
+ // output
+ // from binary
+ r = EOP_RESULT;
+ }
+ }
+
+ } else if (r.returnStatus ==
POStatus.STATUS_EOS) {
+ // If we received EOS, it means all
output
+ // from the streaming binary has been
sent to us
+ // So we can send an EOP to the
successor in
+ // the pipeline. Also we are being
called
+ // from close() in map or reduce (this
is so because
+ // only then
this.parentPlan.endOfAllInput is true).
+ // So once we send this EOP down,
getNext() in POStream
+ // should never be called. So we don't
need to set any
+ // flag noting we saw all output from
binary
+ r = EOP_RESULT;
+ } else if (r.returnStatus == POStatus.STATUS_OK)
+ illustratorMarkup(r.result, r.result,
0);
+ return r;
+ } else {
+ // we are not being called from close() - so
+ // we must be called from either map() or
reduce()
+ // get the next Result from helper
+ if (r.returnStatus == POStatus.STATUS_EOS) {
+ // If we received EOS, it means all
output
+ // from the streaming binary has been
sent to us
+ // So we can send an EOP to the
successor in
+ // the pipeline and also note this
condition
+ // for future calls
+ r = EOP_RESULT;
+ allOutputFromBinaryProcessed = true;
+ } else if (r.returnStatus == POStatus.STATUS_OK)
+ illustratorMarkup(r.result, r.result,
0);
+ return r;
+ }
+
+ } catch (Exception e) {
+ int errCode = 2083;
+ String msg = "Error while trying to get next result in
POStream.";
+ throw new ExecException(msg, errCode, PigException.BUG,
e);
+ }
+ }
+}
Modified: pig/branches/spark/test/e2e/pig/tests/streaming.conf
URL:
http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/streaming.conf?rev=1653820&r1=1653819&r2=1653820&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/streaming.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/streaming.conf Thu Jan 22 12:10:38
2015
@@ -79,7 +79,7 @@ store C into ':OUTPATH:';#,
{
#Section 1.1: perl script, no parameters,
autoship(Section 2.1)
'num' => 4,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
A = load ':INPATH:/singlefile/studenttab10k';
B = foreach A generate $0, $1, $2;
@@ -90,7 +90,7 @@ store C into ':OUTPATH:';#,
{
# Section 1.2: perl script that takes parameters;
explicit ship of script (Section 2.1)
'num' => 5,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD `perl PigStreaming.pl - -` ship(':SCRIPTHOMEPATH:/PigStreaming.pl')
stderr('CMD' limit 1);
A = load ':INPATH:/singlefile/studenttab10k';
@@ -102,7 +102,7 @@ store C into ':OUTPATH:';#,
{
# Section 1.3: define clause; explicit ship of script
(Section 2.1)
'num' => 6,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD `perl PigStreaming.pl` ship(':SCRIPTHOMEPATH:/PigStreaming.pl')
stderr('CMD');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -115,7 +115,7 @@ store D into ':OUTPATH:';#,
{
# Section 1.4: grouped data
'num' => 7,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD `perl GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -128,7 +128,7 @@ store D into ':OUTPATH:';#,
{
# Section 1.4: grouped and ordered data
'num' => 8,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD `perl GroupBy.pl '\t' 0 1` ship(':SCRIPTHOMEPATH:/GroupBy.pl');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -144,7 +144,7 @@ store E into ':OUTPATH:';#,
{
# Section 1.5: multiple streaming operators - adjacent
- map side
'num' => 9,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD `perl PigStreamingDepend.pl` input(stdin)
ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl',
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -157,7 +157,7 @@ store D into ':OUTPATH:';#,
{
# Section 1.5: multiple streaming operators - not
adjacent - map side
'num' => 10,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
A = load ':INPATH:/singlefile/studenttab10k';
define CMD `perl PigStreamingDepend.pl` input(stdin)
ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl',
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
@@ -172,7 +172,7 @@ store E into ':OUTPATH:';#,
{
# Section 1.5: multiple streaming operators - adjacent
- reduce side
'num' => 11,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD1 `perl GroupBy.pl '\t' 0 1` ship(':SCRIPTHOMEPATH:/GroupBy.pl')
stderr('CMD1');
define CMD2 `perl PigStreamingDepend.pl` input(stdin)
ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl',
':SCRIPTHOMEPATH:/PigStreamingModule.pm') stderr('CMD2');
@@ -191,7 +191,7 @@ store F into ':OUTPATH:';#,
# Section 1.5: multiple streaming operators - one on
map and one on reduce side
# same alias name
'num' => 12,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD1 `perl GroupBy.pl '\t' 0` ship(':SCRIPTHOMEPATH:/GroupBy.pl');
define CMD2 `perl PigStreamingDepend.pl` input(stdin)
ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl',
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
@@ -206,7 +206,7 @@ store B into ':OUTPATH:';#,
{
# Section 1.5: multiple streaming operators - adjacent
- map side
'num' => 13,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD `perl PigStreamingDepend.pl` input(stdin)
ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl',
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -232,7 +232,7 @@ store D into ':OUTPATH:';#,
# Section 2.1: perl script and its dependency shipped
# Also covers part of section 3.1: custom serializer
'num' => 1,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD `perl PigStreamingDepend.pl` input(stdin)
ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl',
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -243,7 +243,7 @@ store B into ':OUTPATH:';#,
{
# Section 2.1: perl script and supported data file is
shipped
'num' => 2,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD `perl PigStreaming.pl - - nameMap`
ship(':SCRIPTHOMEPATH:/PigStreaming.pl', ':SCRIPTHOMEPATH:/nameMap');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -257,7 +257,7 @@ store E into ':OUTPATH:';#,
{
# Section 2.2: script is shipped while the supporting
file is cached
'num' => 3,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q@
define CMD `perl PigStreaming.pl - - nameMap`
ship(':SCRIPTHOMEPATH:/PigStreaming.pl')
cache(':INPATH:/nameMap/part-00000#nameMap');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -283,7 +283,7 @@ store E into ':OUTPATH:';@,
{
# Section 3.1: use of custom deserializer
'num' => 1,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD `perl PigStreaming.pl` output(stdout)
ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -294,7 +294,7 @@ store B into ':OUTPATH:';#,
{
# Section 3.1: use of custom serializer and deserializer
'num' => 2,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
register :FUNCPATH:/testudf.jar;
define CMD `perl PigStreaming.pl` input(stdin using
org.apache.pig.test.udf.streaming.StreamingDump) output(stdout using
org.apache.pig.test.udf.streaming.DumpStreamer)
ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
@@ -307,7 +307,7 @@ store C into ':OUTPATH:';#,
{
# Section 3.3: streaming application reads from file
rather than stdin
'num' => 3,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD `perl PigStreaming.pl foo -` input('foo')
ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -318,7 +318,7 @@ store B into ':OUTPATH:';#,
{
# Section 3.4: streaming application writes single
output to a file
'num' => 4,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD `perl PigStreaming.pl - foo nameMap` output('foo')
ship(':SCRIPTHOMEPATH:/PigStreaming.pl', ':SCRIPTHOMEPATH:/nameMap');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -330,7 +330,7 @@ store C into ':OUTPATH:';#,
{
# Section 3.4: streaming application writes multiple
outputs to file
'num' => 5,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD `perl PigStreamingDepend.pl - sio_5_1 sio_5_2` input(stdin)
output('sio_5_1', 'sio_5_2') ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl',
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -341,7 +341,7 @@ store B into ':OUTPATH:';#,
{
# Section 3.4: streaming application writes multiple
outputs: 1 to file and 1 to stdout
'num' => 6,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD `perl PigStreamingDepend.pl - - sio_5_2` input(stdin)
output(stdout, 'sio_5_2') ship(':SCRIPTHOMEPATH:/PigStreamingDepend.pl',
':SCRIPTHOMEPATH:/PigStreamingModule.pm');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -362,7 +362,7 @@ store B into ':OUTPATH:';#,
{
# Section 4.3: integration with parameter substitition
'num' => 1,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig_params' => ['-p',
qq(script_name='PigStreaming.pl')],
'pig' => q#
define CMD `perl $script_name - - nameMap`
ship(':SCRIPTHOMEPATH:/$script_name', ':SCRIPTHOMEPATH:/nameMap');
@@ -387,7 +387,7 @@ store E into ':OUTPATH:';#,
{
# Section 5.1: load/store optimization
'num' => 1,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD `perl PigStreaming.pl` ship(':SCRIPTHOMEPATH:/PigStreaming.pl')
stderr('CMD');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -422,7 +422,7 @@ store D into ':OUTPATH:';#,
{
# PIG-272: problem with optimization and intermediate
store
'num' => 3,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD1 `perl -ne 'print $_;print STDERR "stderr $_";'`;
define CMD2 `Split.pl 3` input(stdin using PigStreaming(','))
ship(':SCRIPTHOMEPATH:/Split.pl');
@@ -444,7 +444,7 @@ store D into ':OUTPATH:';#,
{
# PIG-272: problem with optimization and intermediate
store
'num' => 4,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD1 `perl -ne 'print $_;'`;
define CMD2 `Split.pl 3` input(stdin using PigStreaming(','))
ship(':SCRIPTHOMEPATH:/Split.pl');
@@ -472,7 +472,7 @@ store E into ':OUTPATH:';#,
# Make sure join with stream optimization works
# optimization only on load side
'num' => 5,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through `cat` as (name:chararray, age:int, gpa:double);
@@ -485,7 +485,7 @@ store D into ':OUTPATH:';#,
# Make sure join with stream optimization works
# optimization only on store side
'num' => 6,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
A = load ':INPATH:/singlefile/studenttab10k';
B = filter A by $1 > 25;
@@ -500,7 +500,7 @@ store D into ':OUTPATH:';#,
# Make sure join with stream optimization works
# optimization on load and store
'num' => 7,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through `cat` as (name:chararray, age:int, gpa:double);
@@ -536,7 +536,7 @@ store B into ':OUTPATH:';#,
# case where binary finishes normally
# BEFORE all input has been passed to it
'num' => 2,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD `perl DieRandomly.pl 10000 0`
ship(':SCRIPTHOMEPATH:/DieRandomly.pl');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -565,7 +565,7 @@ store D into ':OUTPATH:';#,
# BEFORE all input has been passed to it
# FIXME: in local mode
'num' => 4,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
B = stream A through `head -1` as (name, age, gpa);
@@ -581,7 +581,7 @@ store E into ':OUTPATH:';#,
# BEFORE all input has been passed to it
# and emits no output
'num' => 5,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD `perl DieRandomly.pl 10000 0`
ship(':SCRIPTHOMEPATH:/DieRandomly.pl');
A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
@@ -598,7 +598,7 @@ store D into ':OUTPATH:';#,
# BEFORE all input has been passed to it
# and emits no output
'num' => 6,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD `perl DieRandomly.pl 10000 0`
ship(':SCRIPTHOMEPATH:/DieRandomly.pl');
A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
@@ -613,7 +613,7 @@ store E into ':OUTPATH:';#,
# two stream operators one after another where first
# one emits no output
'num' => 7,
- 'execonly' => 'mapred,tez',
+ 'execonly' => 'mapred,tez,spark',
'pig' => q#
define CMD `perl DieRandomly.pl 10000 0`
ship(':SCRIPTHOMEPATH:/DieRandomly.pl');
A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);