Author: xuefu
Date: Tue Mar 10 04:37:36 2015
New Revision: 1665404
URL: http://svn.apache.org/r1665404
Log:
PIG-4374: Add SparkPlan in spark package (Liyun via Xuefu)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
Removed:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOper.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOper.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1665404&r1=1665403&r2=1665404&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Tue Mar 10 04:37:36 2015
@@ -39,7 +39,6 @@ import com.google.common.collect.Lists;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -87,8 +86,8 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POStreamSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler;
-import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOper;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPOPackageAnnotator;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPrinter;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
@@ -112,457 +111,529 @@ import org.apache.spark.scheduler.StatsR
*/
public class SparkLauncher extends Launcher {
- private static final Log LOG = LogFactory.getLog(SparkLauncher.class);
+ private static final Log LOG = LogFactory.getLog(SparkLauncher.class);
- // Our connection to Spark. It needs to be static so that it can be reused
- // across jobs, because a
- // new SparkLauncher gets created for each job.
- private static JavaSparkContext sparkContext = null;
- private static JobMetricsListener jobMetricsListener = new
JobMetricsListener();
-
- public static BroadCastServer bcaster;
- private static final Matcher DISTRIBUTED_CACHE_ARCHIVE_MATCHER = Pattern
- .compile("\\.(zip|tgz|tar\\.gz|tar)$").matcher("");
- // An object that handle cache calls in the operator graph. This is again
- // static because we want
- // it to be shared across SparkLaunchers. It gets cleared whenever we close
- // the SparkContext.
- // private static CacheConverter cacheConverter = null;
- private String jobGroupID;
-
- @Override
- public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
- PigContext pigContext) throws Exception {
- LOG.info("!!!!!!!!!! Launching Spark (woot) !!!!!!!!!!!!");
- LOG.debug(physicalPlan);
- JobConf c = SparkUtil.newJobConf(pigContext);
- c.set(PigConstants.LOCAL_CODE_DIR,
System.getProperty("java.io.tmpdir"));
-
- SchemaTupleBackend.initialize(c, pigContext);
- SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
-
- if (System.getenv("BROADCAST_PORT") == null
- && System.getenv("BROADCAST_MASTER_IP") == null) {
- LOG.warn("Missing BROADCAST_POST/BROADCAST_HOST in the
environment.");
- } else {
- if (bcaster == null) {
- bcaster = new BroadCastServer();
- bcaster.startBroadcastServer(Integer.parseInt(System
- .getenv("BROADCAST_PORT")));
- }
- }
-
- SparkPigStats sparkStats = (SparkPigStats)
- pigContext.getExecutionEngine().instantiatePigStats();
- PigStats.start(sparkStats);
-
- startSparkIfNeeded();
- // Set a unique group id for this query, so we can lookup all Spark
job ids
- // related to this query.
- jobGroupID = UUID.randomUUID().toString();
- sparkContext.setJobGroup(jobGroupID, "Pig query to Spark cluster",
false);
- jobMetricsListener.reset();
-
- String currentDirectoryPath =
Paths.get(".").toAbsolutePath().normalize().toString() + "/";
- startSparkJob(pigContext, currentDirectoryPath);
- LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
- physicalPlan, POStore.class);
- POStore firstStore = stores.getFirst();
- if( firstStore != null ){
- MapRedUtil.setupStreamingDirsConfSingle(firstStore, pigContext, c);
- }
-
- // ObjectSerializer.serialize(c);
- byte[] confBytes = KryoSerializer.serializeJobConf(c);
- // initialize the supported converters
- Map<Class<? extends PhysicalOperator>, POConverter> convertMap = new
HashMap<Class<? extends PhysicalOperator>, POConverter>();
-
- convertMap.put(POLoad.class, new LoadConverter(pigContext,
- physicalPlan, sparkContext.sc()));
- convertMap.put(POStore.class, new StoreConverter(pigContext));
- convertMap.put(POForEach.class, new ForEachConverter(confBytes));
- convertMap.put(POFilter.class, new FilterConverter());
- convertMap.put(POPackage.class, new PackageConverter(confBytes
- ));
- // convertMap.put(POCache.class, cacheConverter);
- convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter());
- convertMap.put(POGlobalRearrange.class, new
GlobalRearrangeConverter());
- convertMap.put(POLimit.class, new LimitConverter());
- convertMap.put(PODistinct.class, new DistinctConverter());
- convertMap.put(POUnion.class, new UnionConverter(sparkContext.sc()));
- convertMap.put(POSort.class, new SortConverter());
- convertMap.put(POSplit.class, new SplitConverter());
- convertMap.put(POSkewedJoin.class, new SkewedJoinConverter());
- convertMap.put(POCollectedGroup.class, new CollectedGroupConverter());
- convertMap.put(POCounter.class, new CounterConverter());
- convertMap.put(PORank.class, new RankConverter());
- convertMap.put(POStreamSpark.class,new StreamConverter(confBytes));
-
- sparkPlanToRDD(sparkplan,convertMap, sparkStats, c);
-
- cleanUpSparkJob(pigContext,currentDirectoryPath);
- sparkStats.finish();
- return sparkStats;
- }
-
-
- /**
- * In Spark, currently only async actions return job id.
- * There is no async equivalent of actions like saveAsNewAPIHadoopFile()
- *
- * The only other way to get a job id is to register a "job group ID" with
the
- * spark context and request all job ids corresponding to that job group
via
- * getJobIdsForGroup.
- *
- * However getJobIdsForGroup does not guarantee the order of the elements
in
- * it's result.
- *
- * This method simply returns the previously unseen job ids.
- *
- * @param seenJobIDs job ids in the job group that are already seen
- * @return Spark job ids not seen before
- */
- private List<Integer> getJobIDs(Set<Integer> seenJobIDs) {
- Set<Integer> groupjobIDs = new HashSet<Integer>(Arrays.asList(
- ArrayUtils.toObject(sparkContext.statusTracker()
- .getJobIdsForGroup(jobGroupID))));
- groupjobIDs.removeAll(seenJobIDs);
- List<Integer> unseenJobIDs = new ArrayList<Integer>(groupjobIDs);
- if (unseenJobIDs.size() == 0) {
- throw new RuntimeException("Expected at least one unseen jobID " +
- " in this call to getJobIdsForGroup, but got " +
unseenJobIDs.size());
- }
-
- seenJobIDs.addAll(unseenJobIDs);
- return unseenJobIDs;
- }
-
- private void cleanUpSparkJob(PigContext pigContext, String
currentDirectoryPath) {
- LOG.info("clean up Spark Job");
- boolean isLocal = System.getenv("SPARK_MASTER")!=
null?System.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL"): true;
- if (isLocal) {
- String shipFiles =
pigContext.getProperties().getProperty("pig.streaming.ship.files");
- if (shipFiles != null) {
- for (String file : shipFiles.split(",")) {
- File shipFile = new File(file);
- File deleteFile = new File(currentDirectoryPath + "/" +
shipFile.getName());
- if (deleteFile.exists()) {
- LOG.info(String.format("delete ship file result: %b",
deleteFile.delete()));
- }
- }
- }
- String cacheFiles =
pigContext.getProperties().getProperty("pig.streaming.cache.files");
- if (cacheFiles != null) {
- for (String file : cacheFiles.split(",")) {
- String fileName = extractFileName(file.trim());
- File deleteFile = new File(currentDirectoryPath + "/" +
fileName);
- if (deleteFile.exists()) {
- LOG.info(String.format("delete cache file result: %b",
deleteFile.delete()));
- }
- }
- }
- }
- }
-
- private void startSparkJob(PigContext pigContext, String
currentDirectoryPath) throws IOException {
- LOG.info("start Spark Job");
- String shipFiles =
pigContext.getProperties().getProperty("pig.streaming.ship.files");
- shipFiles(shipFiles, currentDirectoryPath);
- String cacheFiles =
pigContext.getProperties().getProperty("pig.streaming.cache.files");
- cacheFiles(cacheFiles, currentDirectoryPath, pigContext);
- }
-
- private void shipFiles(String shipFiles, String currentDirectoryPath)
throws IOException {
- if (shipFiles != null) {
- for (String file : shipFiles.split(",")) {
- File shipFile = new File(file.trim());
- if (shipFile.exists()) {
- LOG.info(String.format("shipFile:%s",shipFile));
- boolean isLocal = System.getenv("SPARK_MASTER")!=
null?System.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL"): true;
- if (isLocal) {
- File localFile = new File(currentDirectoryPath+"/" +
shipFile.getName());
- if( localFile.exists()){
- LOG.info(String.format("ship file %s exists, ready
to delete",localFile.getAbsolutePath()));
- localFile.delete();
- } else{
- LOG.info(String.format("ship file %s not
exists,",localFile.getAbsolutePath()));
- }
- Files.copy(shipFile.toPath(),
Paths.get(localFile.getAbsolutePath()));
- } else {
-
sparkContext.addFile(shipFile.toURI().toURL().toExternalForm());
- }
- }
- }
- }
- }
-
- private void cacheFiles(String cacheFiles, String currentDirectoryPath,
PigContext pigContext) throws IOException {
- if (cacheFiles != null) {
- Configuration conf = SparkUtil.newJobConf(pigContext);
- boolean isLocal = System.getenv("SPARK_MASTER")!=
null?System.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL"): true;
- for (String file : cacheFiles.split(",")) {
- String fileName = extractFileName(file.trim());
- Path src = new Path(extractFileUrl(file.trim()));
- File tmpFile = File.createTempFile(fileName,".tmp");
- Path tmpFilePath = new Path(tmpFile.getAbsolutePath());
- FileSystem fs = tmpFilePath.getFileSystem(conf);
- fs.copyToLocalFile(src, tmpFilePath);
- tmpFile.deleteOnExit();
- if (isLocal) {
- File localFile = new File(currentDirectoryPath + "/" +
fileName);
- if( localFile.exists()){
- LOG.info(String.format("cache file %s exists, ready to
delete",localFile.getAbsolutePath()));
- localFile.delete();
- } else{
- LOG.info(String.format("cache file %s not
exists,",localFile.getAbsolutePath()));
- }
- Files.copy( Paths.get(tmpFilePath.toString()),
Paths.get(localFile.getAbsolutePath()));
- } else {
-
sparkContext.addFile(tmpFile.toURI().toURL().toExternalForm());
- }
- }
- }
- }
-
- private String extractFileName(String cacheFileUrl) {
- String[] tmpAry = cacheFileUrl.split("#");
- String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[1] :
null;
- if (fileName == null) {
- throw new RuntimeException("cache file is invalid format, file:" +
cacheFileUrl);
- } else {
- LOG.debug("cache file name is valid:" + cacheFileUrl);
- return fileName;
- }
- }
-
- private String extractFileUrl(String cacheFileUrl) {
- String[] tmpAry = cacheFileUrl.split("#");
- String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[0] :
null;
- if (fileName == null) {
- throw new RuntimeException("cache file is invalid format, file:" +
cacheFileUrl);
- } else {
- LOG.debug("cache file name is valid:" + cacheFileUrl);
- return fileName;
- }
- }
-
- private SparkOperPlan compile(PhysicalPlan physicalPlan, PigContext
pigContext) throws PlanException, IOException, VisitorException {
- SparkCompiler sparkCompiler = new SparkCompiler(physicalPlan,
pigContext);
- sparkCompiler.compile();
- SparkOperPlan plan = sparkCompiler.getSparkPlan();
-
- // optimize key - value handling in package
- SparkPOPackageAnnotator pkgAnnotator = new
SparkPOPackageAnnotator(plan);
- pkgAnnotator.visit();
- return plan;
- }
-
- private static void startSparkIfNeeded() throws PigException {
- if (sparkContext == null) {
- String master = System.getenv("SPARK_MASTER");
- if (master == null) {
- LOG.info("SPARK_MASTER not specified, using \"local\"");
- master = "local";
- }
-
- String sparkHome = System.getenv("SPARK_HOME"); // It's okay if
this
- // is null for local
- // mode
- String sparkJarsSetting = System.getenv("SPARK_JARS");
- String pigJar = System.getenv("SPARK_PIG_JAR");
- String[] sparkJars = sparkJarsSetting == null ? new String[] {}
- : sparkJarsSetting.split(",");
-
- // TODO: Don't hardcode this JAR
- List<String> jars = Lists.asList(pigJar, sparkJars);
-
- if (!master.startsWith("local") && !master.equals("yarn-client")) {
- // Check that we have the Mesos native library and Spark home
- // are set
- if (sparkHome == null) {
- System.err
- .println("You need to set SPARK_HOME to run on a
Mesos cluster!");
- throw new PigException("SPARK_HOME is not set");
- }
- /*
- * if (System.getenv("MESOS_NATIVE_LIBRARY") == null) {
- *
- * System.err.println(
- * "You need to set MESOS_NATIVE_LIBRARY to run on a Mesos
cluster!"
- * ); throw new PigException("MESOS_NATIVE_LIBRARY is not
set");
- * }
- *
- * // Tell Spark to use Mesos in coarse-grained mode (only
- * affects Spark 0.6+; no impact on others)
- * System.setProperty("spark.mesos.coarse", "true");
- */
- }
-
-// // For coarse-grained Mesos mode, tell it an upper bound on how
many
-// // cores to grab in total;
-// // we conservatively set this to 32 unless the user set the
-// // SPARK_MAX_CPUS environment variable.
-// if (System.getenv("SPARK_MAX_CPUS") != null) {
-// int maxCores = 32;
-// maxCores = Integer.parseInt(System.getenv("SPARK_MAX_CPUS"));
-// System.setProperty("spark.cores.max", "" + maxCores);
-// }
-// System.setProperty("spark.cores.max", "1");
-// System.setProperty("spark.executor.memory", "" + "512m");
-// System.setProperty("spark.shuffle.memoryFraction", "0.0");
-// System.setProperty("spark.storage.memoryFraction", "0.0");
-
- sparkContext = new JavaSparkContext(master,
- "Spork", sparkHome, jars.toArray(new String[jars.size()]));
- sparkContext.sc().addSparkListener(new StatsReportListener());
- sparkContext.sc().addSparkListener(new JobLogger());
- sparkContext.sc().addSparkListener(jobMetricsListener);
- // cacheConverter = new CacheConverter();
- }
- }
-
- // You can use this in unit tests to stop the SparkContext between tests.
- static void stopSpark() {
- if (sparkContext != null) {
- sparkContext.stop();
- sparkContext = null;
- // cacheConverter = null;
- }
- }
-
- private void sparkPlanToRDD(SparkOperPlan sparkPlan, Map<Class<? extends
PhysicalOperator>, POConverter> convertMap, SparkPigStats sparkStats, JobConf
c) throws IOException , InterruptedException {
- Set<Integer> seenJobIDs = new HashSet<Integer>();
- if (sparkPlan != null) {
- List<SparkOper> leaves = sparkPlan.getLeaves();
- Map<OperatorKey, RDD<Tuple>> sparkOpRdds = new HashMap();
- for (SparkOper leaf : leaves) {
- Map<OperatorKey, RDD<Tuple>> physicalOpRdds = new HashMap();
- sparkOperToRDD(sparkPlan, leaf, sparkOpRdds, physicalOpRdds,
convertMap, seenJobIDs, sparkStats, c);
-
- }
- }
- }
-
- private void sparkOperToRDD(SparkOperPlan sparkPlan,
- SparkOper sparkOper,Map<OperatorKey,
RDD<Tuple>> sparkOpRdds, Map<OperatorKey, RDD<Tuple>> physicalOpRdds,
- Map<Class<? extends PhysicalOperator>,
POConverter> convertMap,
- Set<Integer> seenJobIDs, SparkPigStats
sparkStats, JobConf c ) throws IOException, InterruptedException {
-
- List<SparkOper> predecessors = sparkPlan.getPredecessors(sparkOper);
- List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
- if (predecessors != null) {
- for (SparkOper prede : predecessors) {
- if (sparkOpRdds.get(prede.getOperatorKey()) == null) {
- sparkOperToRDD(sparkPlan, prede, sparkOpRdds,
physicalOpRdds, convertMap,seenJobIDs, sparkStats, c);
- }
- predecessorRDDs.add(sparkOpRdds.get(prede.getOperatorKey()));
- }
- }
-
- List<PhysicalOperator> leafPOs = sparkOper.plan.getLeaves();
- if (leafPOs != null && leafPOs.size() != 1) {
- throw new IllegalArgumentException(String.format("sparkOper.plan
should have 1 leaf, but sparkOper.plan.getLeaves() not equals 1, sparkOper:{}"
+ sparkOper.name()));
- } else {
- PhysicalOperator leafPO = leafPOs.get(0);
- physicalToRDD(sparkOper.plan, leafPO, physicalOpRdds,
predecessorRDDs, convertMap);
-
sparkOpRdds.put(sparkOper.getOperatorKey(),physicalOpRdds.get(leafPO.getOperatorKey()));
- }
-
- List<POStore> poStores = PlanHelper.getPhysicalOperators(
- sparkOper.plan, POStore.class);
- if( poStores!=null && poStores.size() ==1){
- POStore poStore = poStores.get(0);
- for (int jobID : getJobIDs(seenJobIDs)) {
- SparkStatsUtil.waitForJobAddStats(jobID, poStore,
- jobMetricsListener, sparkContext, sparkStats, c);
- }
- } else{
- LOG.info(String.format("sparkOper:{} does not have POStore or
sparkOper has more than 1 POStore",sparkOper.name()));
- }
-
- }
-
- private void physicalToRDD(PhysicalPlan plan,
- PhysicalOperator physicalOperator,
- Map<OperatorKey, RDD<Tuple>> rdds,
List<RDD<Tuple>> rddsFromPredeSparkOper,
- Map<Class<? extends PhysicalOperator>,
POConverter> convertMap)
- throws IOException {
- RDD<Tuple> nextRDD = null;
- List<PhysicalOperator> predecessors = plan
- .getPredecessors(physicalOperator);
- List<RDD<Tuple>> predecessorRdds = Lists.newArrayList();
- if (predecessors != null) {
- for (PhysicalOperator predecessor : predecessors) {
- physicalToRDD(plan, predecessor, rdds,rddsFromPredeSparkOper,
convertMap);
- predecessorRdds.add(rdds.get(predecessor.getOperatorKey()));
- }
- }else{
- if( rddsFromPredeSparkOper!=null &&
rddsFromPredeSparkOper.size()>0 ){
- predecessorRdds.addAll(rddsFromPredeSparkOper);
- }
- }
-
- POConverter converter = convertMap.get(physicalOperator.getClass());
- if (converter == null) {
- throw new IllegalArgumentException(
- "Spork unsupported PhysicalOperator: " + physicalOperator);
- }
-
- LOG.info("Converting operator "
- + physicalOperator.getClass().getSimpleName() + " "
- + physicalOperator);
- nextRDD = converter.convert(predecessorRdds, physicalOperator);
-
- if (nextRDD == null) {
- throw new IllegalArgumentException(
- "RDD should not be null after PhysicalOperator: "
- + physicalOperator);
- }
-
- rdds.put(physicalOperator.getOperatorKey(), nextRDD);
- }
-
- @Override
- public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
- String format, boolean verbose) throws IOException {
- SparkOperPlan sparkPlan = compile(pp, pc);
- ps.println("#-----------------------------------------------------#");
- ps.println("# Spark plan is A DAG, the Spark node relations are:");
- ps.println("#-----------------------------------------------------#");
- Map<OperatorKey, SparkOper> allOperKeys= sparkPlan.getKeys();
- List<OperatorKey> operKeyList = new ArrayList(allOperKeys.keySet());
- Collections.sort(operKeyList);
- for(OperatorKey operatorKey: operKeyList){
- SparkOper op = sparkPlan.getOperator(operatorKey);
- ps.print(op.getOperatorKey());
- List<SparkOper> successors = sparkPlan.getSuccessors(op);
- if( successors!=null) {
- ps.print("->");
- for (SparkOper suc : successors) {
- ps.print(suc.getOperatorKey() + " ");
- }
- }
- ps.println();
- }
- if (format.equals("text")) {
- SparkPrinter printer = new SparkPrinter(ps, sparkPlan);
- printer.setVerbose(verbose);
- printer.visit();
- } else { // TODO: add support for other file format
- throw new IOException("Non-text output of explain is not
supported.");
- }
- }
-
- @Override
- public void kill() throws BackendException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void killJob(String jobID, Configuration conf)
- throws BackendException {
- // TODO Auto-generated method stub
+ // Our connection to Spark. It needs to be static so that it can be
reused
+ // across jobs, because a
+ // new SparkLauncher gets created for each job.
+ private static JavaSparkContext sparkContext = null;
+ private static JobMetricsListener jobMetricsListener = new
JobMetricsListener();
+
+ public static BroadCastServer bcaster;
+ private static final Matcher DISTRIBUTED_CACHE_ARCHIVE_MATCHER = Pattern
+ .compile("\\.(zip|tgz|tar\\.gz|tar)$").matcher("");
+ // An object that handle cache calls in the operator graph. This is
again
+ // static because we want
+ // it to be shared across SparkLaunchers. It gets cleared whenever we
close
+ // the SparkContext.
+ // private static CacheConverter cacheConverter = null;
+ private String jobGroupID;
+
+ @Override
+ public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
+ PigContext pigContext) throws Exception {
+ LOG.info("!!!!!!!!!! Launching Spark (woot) !!!!!!!!!!!!");
+ LOG.debug(physicalPlan);
+ JobConf jobConf = SparkUtil.newJobConf(pigContext);
+ jobConf.set(PigConstants.LOCAL_CODE_DIR,
+ System.getProperty("java.io.tmpdir"));
+
+ SchemaTupleBackend.initialize(jobConf, pigContext);
+ SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
+
+ if (System.getenv("BROADCAST_PORT") == null
+ && System.getenv("BROADCAST_MASTER_IP") ==
null) {
+ LOG.warn("Missing BROADCAST_POST/BROADCAST_HOST in the
environment.");
+ } else {
+ if (bcaster == null) {
+ bcaster = new BroadCastServer();
+
bcaster.startBroadcastServer(Integer.parseInt(System
+ .getenv("BROADCAST_PORT")));
+ }
+ }
+
+ SparkPigStats sparkStats = (SparkPigStats) pigContext
+ .getExecutionEngine().instantiatePigStats();
+ PigStats.start(sparkStats);
+
+ startSparkIfNeeded();
+ // Set a unique group id for this query, so we can lookup all
Spark job
+ // ids
+ // related to this query.
+ jobGroupID = UUID.randomUUID().toString();
+ sparkContext.setJobGroup(jobGroupID, "Pig query to Spark
cluster",
+ false);
+ jobMetricsListener.reset();
+
+ String currentDirectoryPath = Paths.get(".").toAbsolutePath()
+ .normalize().toString()
+ + "/";
+ startSparkJob(pigContext, currentDirectoryPath);
+ LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
+ physicalPlan, POStore.class);
+ POStore firstStore = stores.getFirst();
+ if (firstStore != null) {
+ MapRedUtil.setupStreamingDirsConfSingle(firstStore,
pigContext,
+ jobConf);
+ }
+
+ // ObjectSerializer.serialize(c);
+ byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
+ // initialize the supported converters
+ Map<Class<? extends PhysicalOperator>, POConverter> convertMap
= new HashMap<Class<? extends PhysicalOperator>, POConverter>();
+
+ convertMap.put(POLoad.class, new LoadConverter(pigContext,
+ physicalPlan, sparkContext.sc()));
+ convertMap.put(POStore.class, new StoreConverter(pigContext));
+ convertMap.put(POForEach.class, new
ForEachConverter(confBytes));
+ convertMap.put(POFilter.class, new FilterConverter());
+ convertMap.put(POPackage.class, new
PackageConverter(confBytes));
+ // convertMap.put(POCache.class, cacheConverter);
+ convertMap.put(POLocalRearrange.class, new
LocalRearrangeConverter());
+ convertMap.put(POGlobalRearrange.class, new
GlobalRearrangeConverter());
+ convertMap.put(POLimit.class, new LimitConverter());
+ convertMap.put(PODistinct.class, new DistinctConverter());
+ convertMap.put(POUnion.class, new
UnionConverter(sparkContext.sc()));
+ convertMap.put(POSort.class, new SortConverter());
+ convertMap.put(POSplit.class, new SplitConverter());
+ convertMap.put(POSkewedJoin.class, new SkewedJoinConverter());
+ convertMap.put(POCollectedGroup.class, new
CollectedGroupConverter());
+ convertMap.put(POCounter.class, new CounterConverter());
+ convertMap.put(PORank.class, new RankConverter());
+ convertMap.put(POStreamSpark.class, new
StreamConverter(confBytes));
+
+ sparkPlanToRDD(sparkplan, convertMap, sparkStats, jobConf);
+
+ cleanUpSparkJob(pigContext, currentDirectoryPath);
+ sparkStats.finish();
+ return sparkStats;
+ }
+
+ /**
+ * In Spark, currently only async actions return job id. There is no
async
+ * equivalent of actions like saveAsNewAPIHadoopFile()
+ *
+ * The only other way to get a job id is to register a "job group ID"
with
+ * the spark context and request all job ids corresponding to that job
group
+ * via getJobIdsForGroup.
+ *
+ * However getJobIdsForGroup does not guarantee the order of the
elements in
+ * it's result.
+ *
+ * This method simply returns the previously unseen job ids.
+ *
+ * @param seenJobIDs
+ * job ids in the job group that are already seen
+ * @return Spark job ids not seen before
+ */
+ private List<Integer> getJobIDs(Set<Integer> seenJobIDs) {
+ Set<Integer> groupjobIDs = new HashSet<Integer>(
+
Arrays.asList(ArrayUtils.toObject(sparkContext.statusTracker()
+
.getJobIdsForGroup(jobGroupID))));
+ groupjobIDs.removeAll(seenJobIDs);
+ List<Integer> unseenJobIDs = new
ArrayList<Integer>(groupjobIDs);
+ if (unseenJobIDs.size() == 0) {
+ throw new RuntimeException("Expected at least one
unseen jobID "
+ + " in this call to getJobIdsForGroup,
but got "
+ + unseenJobIDs.size());
+ }
+
+ seenJobIDs.addAll(unseenJobIDs);
+ return unseenJobIDs;
+ }
+
+ private void cleanUpSparkJob(PigContext pigContext,
+ String currentDirectoryPath) {
+ LOG.info("clean up Spark Job");
+ boolean isLocal = System.getenv("SPARK_MASTER") != null ? System
+
.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true;
+ if (isLocal) {
+ String shipFiles =
pigContext.getProperties().getProperty(
+ "pig.streaming.ship.files");
+ if (shipFiles != null) {
+ for (String file : shipFiles.split(",")) {
+ File shipFile = new File(file);
+ File deleteFile = new
File(currentDirectoryPath + "/"
+ + shipFile.getName());
+ if (deleteFile.exists()) {
+ LOG.info(String.format("delete
ship file result: %b",
+
deleteFile.delete()));
+ }
+ }
+ }
+ String cacheFiles =
pigContext.getProperties().getProperty(
+ "pig.streaming.cache.files");
+ if (cacheFiles != null) {
+ for (String file : cacheFiles.split(",")) {
+ String fileName =
extractFileName(file.trim());
+ File deleteFile = new
File(currentDirectoryPath + "/"
+ + fileName);
+ if (deleteFile.exists()) {
+ LOG.info(String.format("delete
cache file result: %b",
+
deleteFile.delete()));
+ }
+ }
+ }
+ }
+ }
+
+ private void startSparkJob(PigContext pigContext,
+ String currentDirectoryPath) throws IOException {
+ LOG.info("start Spark Job");
+ String shipFiles = pigContext.getProperties().getProperty(
+ "pig.streaming.ship.files");
+ shipFiles(shipFiles, currentDirectoryPath);
+ String cacheFiles = pigContext.getProperties().getProperty(
+ "pig.streaming.cache.files");
+ cacheFiles(cacheFiles, currentDirectoryPath, pigContext);
+ }
+
+ private void shipFiles(String shipFiles, String currentDirectoryPath)
+ throws IOException {
+ if (shipFiles != null) {
+ for (String file : shipFiles.split(",")) {
+ File shipFile = new File(file.trim());
+ if (shipFile.exists()) {
+ LOG.info(String.format("shipFile:%s",
shipFile));
+ boolean isLocal =
System.getenv("SPARK_MASTER") != null ? System
+
.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL")
+ : true;
+ if (isLocal) {
+ File localFile = new
File(currentDirectoryPath + "/"
+ +
shipFile.getName());
+ if (localFile.exists()) {
+ LOG.info(String.format(
+ "ship
file %s exists, ready to delete",
+
localFile.getAbsolutePath()));
+ localFile.delete();
+ } else {
+
LOG.info(String.format("ship file %s not exists,",
+
localFile.getAbsolutePath()));
+ }
+ Files.copy(shipFile.toPath(),
+
Paths.get(localFile.getAbsolutePath()));
+ } else {
+
sparkContext.addFile(shipFile.toURI().toURL()
+
.toExternalForm());
+ }
+ }
+ }
+ }
+ }
+
+ private void cacheFiles(String cacheFiles, String currentDirectoryPath,
+ PigContext pigContext) throws IOException {
+ if (cacheFiles != null) {
+ Configuration conf = SparkUtil.newJobConf(pigContext);
+ boolean isLocal = System.getenv("SPARK_MASTER") != null
? System
+
.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true;
+ for (String file : cacheFiles.split(",")) {
+ String fileName = extractFileName(file.trim());
+ Path src = new
Path(extractFileUrl(file.trim()));
+ File tmpFile = File.createTempFile(fileName,
".tmp");
+ Path tmpFilePath = new
Path(tmpFile.getAbsolutePath());
+ FileSystem fs = tmpFilePath.getFileSystem(conf);
+ fs.copyToLocalFile(src, tmpFilePath);
+ tmpFile.deleteOnExit();
+ if (isLocal) {
+ File localFile = new
File(currentDirectoryPath + "/"
+ + fileName);
+ if (localFile.exists()) {
+ LOG.info(String.format(
+ "cache file %s
exists, ready to delete",
+
localFile.getAbsolutePath()));
+ localFile.delete();
+ } else {
+ LOG.info(String.format("cache
file %s not exists,",
+
localFile.getAbsolutePath()));
+ }
+
Files.copy(Paths.get(tmpFilePath.toString()),
+
Paths.get(localFile.getAbsolutePath()));
+ } else {
+
sparkContext.addFile(tmpFile.toURI().toURL()
+ .toExternalForm());
+ }
+ }
+ }
+ }
+
+ private String extractFileName(String cacheFileUrl) {
+ String[] tmpAry = cacheFileUrl.split("#");
+ String fileName = tmpAry != null && tmpAry.length == 2 ?
tmpAry[1]
+ : null;
+ if (fileName == null) {
+ throw new RuntimeException("cache file is invalid
format, file:"
+ + cacheFileUrl);
+ } else {
+ LOG.debug("cache file name is valid:" + cacheFileUrl);
+ return fileName;
+ }
+ }
+
+ private String extractFileUrl(String cacheFileUrl) {
+ String[] tmpAry = cacheFileUrl.split("#");
+ String fileName = tmpAry != null && tmpAry.length == 2 ?
tmpAry[0]
+ : null;
+ if (fileName == null) {
+ throw new RuntimeException("cache file is invalid
format, file:"
+ + cacheFileUrl);
+ } else {
+ LOG.debug("cache file name is valid:" + cacheFileUrl);
+ return fileName;
+ }
+ }
+
+ private SparkOperPlan compile(PhysicalPlan physicalPlan,
+ PigContext pigContext) throws PlanException,
IOException,
+ VisitorException {
+ SparkCompiler sparkCompiler = new SparkCompiler(physicalPlan,
+ pigContext);
+ sparkCompiler.compile();
+ SparkOperPlan sparkPlan = sparkCompiler.getSparkPlan();
+
+ // optimize key - value handling in package
+ SparkPOPackageAnnotator pkgAnnotator = new
SparkPOPackageAnnotator(
+ sparkPlan);
+ pkgAnnotator.visit();
+ return sparkPlan;
+ }
+
+ private static void startSparkIfNeeded() throws PigException {
+ if (sparkContext == null) {
+ String master = System.getenv("SPARK_MASTER");
+ if (master == null) {
+ LOG.info("SPARK_MASTER not specified, using
\"local\"");
+ master = "local";
+ }
+
+ String sparkHome = System.getenv("SPARK_HOME"); // It's
okay if this
+ // is null for local
+ // mode
+ String sparkJarsSetting = System.getenv("SPARK_JARS");
+ String pigJar = System.getenv("SPARK_PIG_JAR");
+ String[] sparkJars = sparkJarsSetting == null ? new
String[] {}
+ : sparkJarsSetting.split(",");
+
+ // TODO: Don't hardcode this JAR
+ List<String> jars = Lists.asList(pigJar, sparkJars);
+
+ if (!master.startsWith("local") &&
!master.equals("yarn-client")) {
+ // Check that we have the Mesos native library
and Spark home
+ // are set
+ if (sparkHome == null) {
+ System.err
+ .println("You need to
set SPARK_HOME to run on a Mesos cluster!");
+ throw new PigException("SPARK_HOME is
not set");
+ }
+ /*
+ * if (System.getenv("MESOS_NATIVE_LIBRARY") ==
null) {
+ *
+ * System.err.println(
+ * "You need to set MESOS_NATIVE_LIBRARY to run
on a Mesos cluster!"
+ * ); throw new
PigException("MESOS_NATIVE_LIBRARY is not set");
+ * }
+ *
+ * // Tell Spark to use Mesos in coarse-grained
mode (only
+ * affects Spark 0.6+; no impact on others)
+ * System.setProperty("spark.mesos.coarse",
"true");
+ */
+ }
+
+ // // For coarse-grained Mesos mode, tell it an upper
bound on how
+ // many
+ // // cores to grab in total;
+ // // we conservatively set this to 32 unless the user
set the
+ // // SPARK_MAX_CPUS environment variable.
+ // if (System.getenv("SPARK_MAX_CPUS") != null) {
+ // int maxCores = 32;
+ // maxCores =
Integer.parseInt(System.getenv("SPARK_MAX_CPUS"));
+ // System.setProperty("spark.cores.max", "" + maxCores);
+ // }
+ // System.setProperty("spark.cores.max", "1");
+ // System.setProperty("spark.executor.memory", "" +
"512m");
+ // System.setProperty("spark.shuffle.memoryFraction",
"0.0");
+ // System.setProperty("spark.storage.memoryFraction",
"0.0");
+
+ sparkContext = new JavaSparkContext(master, "Spork",
sparkHome,
+ jars.toArray(new String[jars.size()]));
+ sparkContext.sc().addSparkListener(new
StatsReportListener());
+ sparkContext.sc().addSparkListener(new JobLogger());
+ sparkContext.sc().addSparkListener(jobMetricsListener);
+ // cacheConverter = new CacheConverter();
+ }
+ }
+
+ // You can use this in unit tests to stop the SparkContext between
tests.
+ static void stopSpark() {
+ if (sparkContext != null) {
+ sparkContext.stop();
+ sparkContext = null;
+ // cacheConverter = null;
+ }
+ }
+
+ private void sparkPlanToRDD(SparkOperPlan sparkPlan,
+ Map<Class<? extends PhysicalOperator>, POConverter>
convertMap,
+ SparkPigStats sparkStats, JobConf jobConf) throws
IOException,
+ InterruptedException {
+ Set<Integer> seenJobIDs = new HashSet<Integer>();
+ if (sparkPlan != null) {
+ List<SparkOperator> leaves = sparkPlan.getLeaves();
+ Map<OperatorKey, RDD<Tuple>> sparkOpToRdds = new
HashMap();
+ for (SparkOperator leaf : leaves) {
+ Map<OperatorKey, RDD<Tuple>> physicalOpToRdds =
new HashMap();
+ sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
+ physicalOpToRdds, convertMap,
seenJobIDs, sparkStats,
+ jobConf);
+
+ }
+ } else {
+ throw new RuntimeException("sparkPlan is null");
+ }
+ }
+
+ private void sparkOperToRDD(SparkOperPlan sparkPlan,
+ SparkOperator sparkOperator,
+ Map<OperatorKey, RDD<Tuple>> sparkOpRdds,
+ Map<OperatorKey, RDD<Tuple>> physicalOpRdds,
+ Map<Class<? extends PhysicalOperator>, POConverter>
convertMap,
+ Set<Integer> seenJobIDs, SparkPigStats sparkStats,
JobConf conf)
+ throws IOException, InterruptedException {
+
+ List<SparkOperator> predecessors = sparkPlan
+ .getPredecessors(sparkOperator);
+ List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
+ if (predecessors != null) {
+ for (SparkOperator pred : predecessors) {
+ if (sparkOpRdds.get(pred.getOperatorKey()) ==
null) {
+ sparkOperToRDD(sparkPlan, pred,
sparkOpRdds,
+ physicalOpRdds,
convertMap, seenJobIDs, sparkStats,
+ conf);
+ }
+
predecessorRDDs.add(sparkOpRdds.get(pred.getOperatorKey()));
+ }
+ }
+
+ List<PhysicalOperator> leafPOs =
sparkOperator.physicalPlan.getLeaves();
+ if (leafPOs != null && leafPOs.size() != 1) {
+ throw new IllegalArgumentException(
+ String.format(
+ "sparkOperator "
+ +
".physicalPlan should have 1 leaf, but sparkOperator"
+ +
".physicalPlan.getLeaves():{} not equals 1, sparkOperator"
+ +
"sparkOperator:{}",
+
sparkOperator.physicalPlan.getLeaves().size(),
+ sparkOperator.name()));
+ } else {
+ PhysicalOperator leafPO = leafPOs.get(0);
+ physicalToRDD(sparkOperator.physicalPlan, leafPO,
physicalOpRdds,
+ predecessorRDDs, convertMap);
+ sparkOpRdds.put(sparkOperator.getOperatorKey(),
+
physicalOpRdds.get(leafPO.getOperatorKey()));
+ }
+
+ List<POStore> poStores = PlanHelper.getPhysicalOperators(
+ sparkOperator.physicalPlan, POStore.class);
+ if (poStores != null && poStores.size() == 1) {
+ POStore poStore = poStores.get(0);
+ for (int jobID : getJobIDs(seenJobIDs)) {
+ SparkStatsUtil.waitForJobAddStats(jobID,
poStore,
+ jobMetricsListener,
sparkContext, sparkStats, conf);
+ }
+ } else {
+ LOG.info(String
+ .format("sparkOperator:{} does not have
POStore or "
+ + "sparkOperator has
more than 1 POStore, the size of POStore",
+ sparkOperator.name(),
poStores.size()));
+ }
+
+ }
+
+ private void physicalToRDD(PhysicalPlan plan,
+ PhysicalOperator physicalOperator,
+ Map<OperatorKey, RDD<Tuple>> rdds,
+ List<RDD<Tuple>> rddsFromPredeSparkOper,
+ Map<Class<? extends PhysicalOperator>, POConverter>
convertMap)
+ throws IOException {
+ RDD<Tuple> nextRDD = null;
+ List<PhysicalOperator> predecessors = plan
+ .getPredecessors(physicalOperator);
+ List<RDD<Tuple>> predecessorRdds = Lists.newArrayList();
+ if (predecessors != null) {
+ for (PhysicalOperator predecessor : predecessors) {
+ physicalToRDD(plan, predecessor, rdds,
rddsFromPredeSparkOper,
+ convertMap);
+
predecessorRdds.add(rdds.get(predecessor.getOperatorKey()));
+ }
+ } else {
+ if (rddsFromPredeSparkOper != null
+ && rddsFromPredeSparkOper.size() > 0) {
+ predecessorRdds.addAll(rddsFromPredeSparkOper);
+ }
+ }
+
+ POConverter converter =
convertMap.get(physicalOperator.getClass());
+ if (converter == null) {
+ throw new IllegalArgumentException(
+ "Spork unsupported PhysicalOperator: "
+ physicalOperator);
+ }
+
+ LOG.info("Converting operator "
+ + physicalOperator.getClass().getSimpleName() +
" "
+ + physicalOperator);
+ nextRDD = converter.convert(predecessorRdds, physicalOperator);
+
+ if (nextRDD == null) {
+ throw new IllegalArgumentException(
+ "RDD should not be null after
PhysicalOperator: "
+ + physicalOperator);
+ }
+
+ rdds.put(physicalOperator.getOperatorKey(), nextRDD);
+ }
+
+ @Override
+ public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
+ String format, boolean verbose) throws IOException {
+ SparkOperPlan sparkPlan = compile(pp, pc);
+
ps.println("#-----------------------------------------------------#");
+ ps.println("#The Spark node relations are:");
+
ps.println("#-----------------------------------------------------#");
+ Map<OperatorKey, SparkOperator> allOperKeys =
sparkPlan.getKeys();
+ List<OperatorKey> operKeyList = new
ArrayList(allOperKeys.keySet());
+ Collections.sort(operKeyList);
+ for (OperatorKey operatorKey : operKeyList) {
+ SparkOperator op = sparkPlan.getOperator(operatorKey);
+ ps.print(op.getOperatorKey());
+ List<SparkOperator> successors =
sparkPlan.getSuccessors(op);
+ if (successors != null) {
+ ps.print("->");
+ for (SparkOperator suc : successors) {
+ ps.print(suc.getOperatorKey() + " ");
+ }
+ }
+ ps.println();
+ }
+ if (format.equals("text")) {
+ SparkPrinter printer = new SparkPrinter(ps, sparkPlan);
+ printer.setVerbose(verbose);
+ printer.visit();
+ } else { // TODO: add support for other file format
+ throw new IOException(
+ "Non-text output of explain is not
supported.");
+ }
+ }
+
+ @Override
+ public void kill() throws BackendException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void killJob(String jobID, Configuration conf)
+ throws BackendException {
+ // TODO Auto-generated method stub
- }
-}
\ No newline at end of file
+ }
+}
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java?rev=1665404&view=auto
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
(added)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
Tue Mar 10 04:37:36 2015
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.operator;
+
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.plan.OperatorKey;
+
+/**
+ * NativeSparkOperator:
+ */
+public class NativeSparkOperator extends SparkOperator {
+ private static final long serialVersionUID = 1L;
+ private static int countJobs = 0;
+ private String nativeSparkJar;
+ private String[] params;
+ private String jobId;
+
+ public NativeSparkOperator(OperatorKey k, String sparkJar, String[]
parameters) {
+ super(k);
+ nativeSparkJar = sparkJar;
+ params = parameters;
+ jobId = sparkJar + "_" + getJobNumber();
+ }
+
+ private static int getJobNumber() {
+ countJobs++;
+ return countJobs;
+ }
+}