Author: xuefu
Date: Fri Jul 3 12:48:05 2015
New Revision: 1689010
URL: http://svn.apache.org/r1689010
Log:
PIG-4619: Cleanup: change the indent size of some files of pig on spark project
from 2 to 4 space (Liyun via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java?rev=1689010&r1=1689009&r2=1689010&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
Fri Jul 3 12:48:05 2015
@@ -21,6 +21,7 @@ package org.apache.pig.backend.hadoop.ex
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.executor.TaskMetrics;
@@ -49,150 +50,150 @@ import java.util.Set;
public class JobMetricsListener implements SparkListener {
- private static final Log LOG = LogFactory.getLog(JobMetricsListener.class);
+ private static final Log LOG = LogFactory.getLog(JobMetricsListener.class);
- private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap();
- private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap();
- private final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics =
Maps.newHashMap();
- private final Set<Integer> finishedJobIds = Sets.newHashSet();
-
- @Override
- public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
-
- }
-
- @Override
- public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
-
- }
-
- @Override
- public void onTaskStart(SparkListenerTaskStart taskStart) {
-
- }
-
- @Override
- public void onTaskGettingResult(SparkListenerTaskGettingResult
taskGettingResult) {
-
- }
-
- @Override
- public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) {
-
- }
-
- @Override
- public void onExecutorAdded(SparkListenerExecutorAdded executorAdded){
-
- }
-
- @Override
- public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) {
- int stageId = taskEnd.stageId();
- int stageAttemptId = taskEnd.stageAttemptId();
- String stageIdentifier = stageId + "_" + stageAttemptId;
- Integer jobId = stageIdToJobId.get(stageId);
- if (jobId == null) {
- LOG.warn("Cannot find job id for stage[" + stageId + "].");
- } else {
- Map<String, List<TaskMetrics>> jobMetrics = allJobMetrics.get(jobId);
- if (jobMetrics == null) {
- jobMetrics = Maps.newHashMap();
- allJobMetrics.put(jobId, jobMetrics);
- }
- List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier);
- if (stageMetrics == null) {
- stageMetrics = Lists.newLinkedList();
- jobMetrics.put(stageIdentifier, stageMetrics);
- }
- stageMetrics.add(taskEnd.taskMetrics());
- }
- }
-
- @Override
- public synchronized void onJobStart(SparkListenerJobStart jobStart) {
- int jobId = jobStart.jobId();
- int size = jobStart.stageIds().size();
- int[] intStageIds = new int[size];
- for (int i = 0; i < size; i++) {
- Integer stageId = (Integer) jobStart.stageIds().apply(i);
- intStageIds[i] = stageId;
- stageIdToJobId.put(stageId, jobId);
- }
- jobIdToStageId.put(jobId, intStageIds);
- }
-
- @Override
- public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
- finishedJobIds.add(jobEnd.jobId());
- notify();
- }
-
- @Override
- public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate
environmentUpdate) {
-
- }
-
- @Override
- public void onBlockManagerAdded(SparkListenerBlockManagerAdded
blockManagerAdded) {
-
- }
-
- @Override
- public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved
blockManagerRemoved) {
-
- }
-
- @Override
- public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
-
- }
-
- @Override
- public void onApplicationStart(SparkListenerApplicationStart
applicationStart) {
-
- }
-
- @Override
- public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
-
- }
-
- @Override
- public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate
executorMetricsUpdate) {
-
- }
-
- public synchronized Map<String, List<TaskMetrics>> getJobMetric(int jobId) {
- return allJobMetrics.get(jobId);
- }
-
- public synchronized boolean waitForJobToEnd(int jobId) throws
InterruptedException {
- if (finishedJobIds.contains(jobId)) {
- finishedJobIds.remove(jobId);
- return true;
- }
-
- wait();
- return false;
- }
-
- public synchronized void cleanup(int jobId) {
- allJobMetrics.remove(jobId);
- jobIdToStageId.remove(jobId);
- Iterator<Map.Entry<Integer, Integer>> iterator =
stageIdToJobId.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<Integer, Integer> entry = iterator.next();
- if (entry.getValue() == jobId) {
- iterator.remove();
- }
- }
- }
-
- public synchronized void reset() {
- stageIdToJobId.clear();
- jobIdToStageId.clear();
- allJobMetrics.clear();
- finishedJobIds.clear();
- }
+ private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap();
+ private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap();
+ private final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics =
Maps.newHashMap();
+ private final Set<Integer> finishedJobIds = Sets.newHashSet();
+
+ @Override
+ public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
+
+ }
+
+ @Override
+ public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
+
+ }
+
+ @Override
+ public void onTaskStart(SparkListenerTaskStart taskStart) {
+
+ }
+
+ @Override
+ public void onTaskGettingResult(SparkListenerTaskGettingResult
taskGettingResult) {
+
+ }
+
+ @Override
+ public void onExecutorRemoved(SparkListenerExecutorRemoved
executorRemoved) {
+
+ }
+
+ @Override
+ public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
+
+ }
+
+ @Override
+ public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+ int stageId = taskEnd.stageId();
+ int stageAttemptId = taskEnd.stageAttemptId();
+ String stageIdentifier = stageId + "_" + stageAttemptId;
+ Integer jobId = stageIdToJobId.get(stageId);
+ if (jobId == null) {
+ LOG.warn("Cannot find job id for stage[" + stageId + "].");
+ } else {
+ Map<String, List<TaskMetrics>> jobMetrics =
allJobMetrics.get(jobId);
+ if (jobMetrics == null) {
+ jobMetrics = Maps.newHashMap();
+ allJobMetrics.put(jobId, jobMetrics);
+ }
+ List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier);
+ if (stageMetrics == null) {
+ stageMetrics = Lists.newLinkedList();
+ jobMetrics.put(stageIdentifier, stageMetrics);
+ }
+ stageMetrics.add(taskEnd.taskMetrics());
+ }
+ }
+
+ @Override
+ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
+ int jobId = jobStart.jobId();
+ int size = jobStart.stageIds().size();
+ int[] intStageIds = new int[size];
+ for (int i = 0; i < size; i++) {
+ Integer stageId = (Integer) jobStart.stageIds().apply(i);
+ intStageIds[i] = stageId;
+ stageIdToJobId.put(stageId, jobId);
+ }
+ jobIdToStageId.put(jobId, intStageIds);
+ }
+
+ @Override
+ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
+ finishedJobIds.add(jobEnd.jobId());
+ notify();
+ }
+
+ @Override
+ public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate
environmentUpdate) {
+
+ }
+
+ @Override
+ public void onBlockManagerAdded(SparkListenerBlockManagerAdded
blockManagerAdded) {
+
+ }
+
+ @Override
+ public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved
blockManagerRemoved) {
+
+ }
+
+ @Override
+ public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
+
+ }
+
+ @Override
+ public void onApplicationStart(SparkListenerApplicationStart
applicationStart) {
+
+ }
+
+ @Override
+ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
+
+ }
+
+ @Override
+ public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate
executorMetricsUpdate) {
+
+ }
+
+ public synchronized Map<String, List<TaskMetrics>> getJobMetric(int jobId)
{
+ return allJobMetrics.get(jobId);
+ }
+
+ public synchronized boolean waitForJobToEnd(int jobId) throws
InterruptedException {
+ if (finishedJobIds.contains(jobId)) {
+ finishedJobIds.remove(jobId);
+ return true;
+ }
+
+ wait();
+ return false;
+ }
+
+ public synchronized void cleanup(int jobId) {
+ allJobMetrics.remove(jobId);
+ jobIdToStageId.remove(jobId);
+ Iterator<Map.Entry<Integer, Integer>> iterator =
stageIdToJobId.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, Integer> entry = iterator.next();
+ if (entry.getValue() == jobId) {
+ iterator.remove();
+ }
+ }
+ }
+
+ public synchronized void reset() {
+ stageIdToJobId.clear();
+ jobIdToStageId.clear();
+ allJobMetrics.clear();
+ finishedJobIds.clear();
+ }
}
\ No newline at end of file
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1689010&r1=1689009&r2=1689010&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Fri Jul 3 12:48:05 2015
@@ -104,89 +104,89 @@ import org.apache.spark.SparkException;
*/
public class SparkLauncher extends Launcher {
- private static final Log LOG = LogFactory.getLog(SparkLauncher.class);
+ private static final Log LOG = LogFactory.getLog(SparkLauncher.class);
- // Our connection to Spark. It needs to be static so that it can be
reused
- // across jobs, because a
- // new SparkLauncher gets created for each job.
- private static JavaSparkContext sparkContext = null;
- private static JobMetricsListener jobMetricsListener = new
JobMetricsListener();
- private String jobGroupID;
+ // Our connection to Spark. It needs to be static so that it can be reused
+ // across jobs, because a
+ // new SparkLauncher gets created for each job.
+ private static JavaSparkContext sparkContext = null;
+ private static JobMetricsListener jobMetricsListener = new
JobMetricsListener();
+ private String jobGroupID;
private PigContext pigContext = null;
private JobConf jobConf = null;
private String currentDirectoryPath = null;
- @Override
- public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
- PigContext pigContext) throws Exception {
- if (LOG.isDebugEnabled())
- LOG.debug(physicalPlan);
+ @Override
+ public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
+ PigContext pigContext) throws Exception {
+ if (LOG.isDebugEnabled())
+ LOG.debug(physicalPlan);
this.pigContext = pigContext;
- initialize();
- SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
- if (LOG.isDebugEnabled())
- explain(sparkplan, System.out, "text", true);
- SparkPigStats sparkStats = (SparkPigStats) pigContext
- .getExecutionEngine().instantiatePigStats();
+ initialize();
+ SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
+ if (LOG.isDebugEnabled())
+ explain(sparkplan, System.out, "text", true);
+ SparkPigStats sparkStats = (SparkPigStats) pigContext
+ .getExecutionEngine().instantiatePigStats();
sparkStats.initialize(sparkplan);
- PigStats.start(sparkStats);
+ PigStats.start(sparkStats);
- startSparkIfNeeded(pigContext);
+ startSparkIfNeeded(pigContext);
- // Set a unique group id for this query, so we can lookup all
Spark job
- // ids
- // related to this query.
- jobGroupID = UUID.randomUUID().toString();
- sparkContext.setJobGroup(jobGroupID, "Pig query to Spark
cluster",
- false);
- jobMetricsListener.reset();
-
- this.currentDirectoryPath = Paths.get(".").toAbsolutePath()
- .normalize().toString()
- + "/";
- addFilesToSparkJob();
- LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
- physicalPlan, POStore.class);
- POStore firstStore = stores.getFirst();
- if (firstStore != null) {
- MapRedUtil.setupStreamingDirsConfSingle(firstStore,
pigContext,
- jobConf);
- }
-
- new ParallelismSetter(sparkplan, jobConf).visit();
-
- byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
-
- // Create conversion map, mapping between pig operator and
spark convertor
- Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap
- = new HashMap<Class<? extends
PhysicalOperator>, RDDConverter>();
- convertMap.put(POLoad.class, new LoadConverter(pigContext,
- physicalPlan, sparkContext.sc()));
- convertMap.put(POStore.class, new StoreConverter(pigContext));
- convertMap.put(POForEach.class, new
ForEachConverter(confBytes));
- convertMap.put(POFilter.class, new FilterConverter());
- convertMap.put(POPackage.class, new
PackageConverter(confBytes));
- convertMap.put(POLocalRearrange.class, new
LocalRearrangeConverter());
+ // Set a unique group id for this query, so we can lookup all Spark job
+ // ids
+ // related to this query.
+ jobGroupID = UUID.randomUUID().toString();
+ sparkContext.setJobGroup(jobGroupID, "Pig query to Spark cluster",
+ false);
+ jobMetricsListener.reset();
+
+ this.currentDirectoryPath = Paths.get(".").toAbsolutePath()
+ .normalize().toString()
+ + "/";
+ addFilesToSparkJob();
+ LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
+ physicalPlan, POStore.class);
+ POStore firstStore = stores.getFirst();
+ if (firstStore != null) {
+ MapRedUtil.setupStreamingDirsConfSingle(firstStore, pigContext,
+ jobConf);
+ }
+
+ new ParallelismSetter(sparkplan, jobConf).visit();
+
+ byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
+
+ // Create conversion map, mapping between pig operator and spark
convertor
+ Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap
+ = new HashMap<Class<? extends PhysicalOperator>,
RDDConverter>();
+ convertMap.put(POLoad.class, new LoadConverter(pigContext,
+ physicalPlan, sparkContext.sc()));
+ convertMap.put(POStore.class, new StoreConverter(pigContext));
+ convertMap.put(POForEach.class, new ForEachConverter(confBytes));
+ convertMap.put(POFilter.class, new FilterConverter());
+ convertMap.put(POPackage.class, new PackageConverter(confBytes));
+ convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter());
convertMap.put(POGlobalRearrangeSpark.class, new
GlobalRearrangeConverter());
convertMap.put(POLimit.class, new LimitConverter());
convertMap.put(PODistinct.class, new DistinctConverter());
- convertMap.put(POUnion.class, new
UnionConverter(sparkContext.sc()));
- convertMap.put(POSort.class, new SortConverter());
- convertMap.put(POSplit.class, new SplitConverter());
- convertMap.put(POSkewedJoin.class, new SkewedJoinConverter());
- convertMap.put(POMergeJoin.class, new MergeJoinConverter());
- convertMap.put(POCollectedGroup.class, new
CollectedGroupConverter());
- convertMap.put(POCounter.class, new CounterConverter());
- convertMap.put(PORank.class, new RankConverter());
- convertMap.put(POStream.class, new StreamConverter(confBytes));
- convertMap.put(POFRJoin.class, new FRJoinConverter());
-
- sparkPlanToRDD(sparkplan, convertMap, sparkStats, jobConf);
- cleanUpSparkJob();
- sparkStats.finish();
+ convertMap.put(POUnion.class, new UnionConverter(sparkContext.sc()));
+ convertMap.put(POSort.class, new SortConverter());
+ convertMap.put(POSplit.class, new SplitConverter());
+ convertMap.put(POSkewedJoin.class, new SkewedJoinConverter());
+ convertMap.put(POMergeJoin.class, new MergeJoinConverter());
+ convertMap.put(POCollectedGroup.class, new CollectedGroupConverter());
+ convertMap.put(POCounter.class, new CounterConverter());
+ convertMap.put(PORank.class, new RankConverter());
+ convertMap.put(POStream.class, new StreamConverter(confBytes));
+ convertMap.put(POFRJoin.class, new FRJoinConverter());
+
+ sparkPlanToRDD(sparkplan, convertMap, sparkStats, jobConf);
+ cleanUpSparkJob();
+ sparkStats.finish();
- return sparkStats;
- }
+ return sparkStats;
+ }
private void optimize(PigContext pc, SparkOperPlan plan) throws
VisitorException {
String prop =
pc.getProperties().getProperty(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY);
@@ -203,122 +203,121 @@ public class SparkLauncher extends Launc
}
}
- /**
- * In Spark, currently only async actions return job id. There is no
async
- * equivalent of actions like saveAsNewAPIHadoopFile()
- *
- * The only other way to get a job id is to register a "job group ID"
with
- * the spark context and request all job ids corresponding to that job
group
- * via getJobIdsForGroup.
- *
- * However getJobIdsForGroup does not guarantee the order of the
elements in
- * it's result.
- *
- * This method simply returns the previously unseen job ids.
- *
- * @param seenJobIDs
- * job ids in the job group that are already seen
- * @return Spark job ids not seen before
- */
- private List<Integer> getJobIDs(Set<Integer> seenJobIDs) {
- Set<Integer> groupjobIDs = new HashSet<Integer>(
-
Arrays.asList(ArrayUtils.toObject(sparkContext.statusTracker()
-
.getJobIdsForGroup(jobGroupID))));
- groupjobIDs.removeAll(seenJobIDs);
- List<Integer> unseenJobIDs = new
ArrayList<Integer>(groupjobIDs);
- if (unseenJobIDs.size() == 0) {
- throw new RuntimeException("Expected at least one
unseen jobID "
- + " in this call to getJobIdsForGroup,
but got "
- + unseenJobIDs.size());
- }
-
- seenJobIDs.addAll(unseenJobIDs);
- return unseenJobIDs;
- }
-
- private void cleanUpSparkJob() {
- LOG.info("clean up Spark Job");
- boolean isLocal = System.getenv("SPARK_MASTER") != null ? System
-
.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true;
- if (isLocal) {
- String shipFiles =
pigContext.getProperties().getProperty(
- "pig.streaming.ship.files");
- if (shipFiles != null) {
- for (String file : shipFiles.split(",")) {
- File shipFile = new File(file);
- File deleteFile = new
File(currentDirectoryPath + "/"
- + shipFile.getName());
- if (deleteFile.exists()) {
- LOG.info(String.format("delete
ship file result: %b",
-
deleteFile.delete()));
- }
- }
- }
- String cacheFiles =
pigContext.getProperties().getProperty(
- "pig.streaming.cache.files");
- if (cacheFiles != null) {
- for (String file : cacheFiles.split(",")) {
- String fileName =
extractFileName(file.trim());
- File deleteFile = new
File(currentDirectoryPath + "/"
- + fileName);
- if (deleteFile.exists()) {
- LOG.info(String.format("delete
cache file result: %b",
-
deleteFile.delete()));
- }
- }
- }
- }
- }
-
- private void addFilesToSparkJob() throws IOException {
- LOG.info("add Files Spark Job");
- String shipFiles = pigContext.getProperties().getProperty(
- "pig.streaming.ship.files");
- shipFiles(shipFiles);
- String cacheFiles = pigContext.getProperties().getProperty(
- "pig.streaming.cache.files");
- cacheFiles(cacheFiles);
- }
-
-
- private void shipFiles(String shipFiles)
- throws IOException {
- if (shipFiles != null) {
- for (String file : shipFiles.split(",")) {
- File shipFile = new File(file.trim());
- if (shipFile.exists()) {
- LOG.info(String.format("shipFile:%s",
shipFile));
-
addJarToSparkJobWorkingDirectory(shipFile,shipFile.getName());
- }
- }
- }
- }
-
- private void cacheFiles(String cacheFiles) throws IOException {
- if (cacheFiles != null) {
- Configuration conf = SparkUtil.newJobConf(pigContext);
- for (String file : cacheFiles.split(",")) {
- String fileName = extractFileName(file.trim());
- Path src = new
Path(extractFileUrl(file.trim()));
- File tmpFile = File.createTempFile(fileName,
".tmp");
- Path tmpFilePath = new
Path(tmpFile.getAbsolutePath());
- FileSystem fs = tmpFilePath.getFileSystem(conf);
- fs.copyToLocalFile(src, tmpFilePath);
- tmpFile.deleteOnExit();
+ /**
+ * In Spark, currently only async actions return job id. There is no async
+ * equivalent of actions like saveAsNewAPIHadoopFile()
+ * <p/>
+ * The only other way to get a job id is to register a "job group ID" with
+ * the spark context and request all job ids corresponding to that job
group
+ * via getJobIdsForGroup.
+ * <p/>
+ * However getJobIdsForGroup does not guarantee the order of the elements
in
+ * it's result.
+ * <p/>
+ * This method simply returns the previously unseen job ids.
+ *
+ * @param seenJobIDs job ids in the job group that are already seen
+ * @return Spark job ids not seen before
+ */
+ private List<Integer> getJobIDs(Set<Integer> seenJobIDs) {
+ Set<Integer> groupjobIDs = new HashSet<Integer>(
+ Arrays.asList(ArrayUtils.toObject(sparkContext.statusTracker()
+ .getJobIdsForGroup(jobGroupID))));
+ groupjobIDs.removeAll(seenJobIDs);
+ List<Integer> unseenJobIDs = new ArrayList<Integer>(groupjobIDs);
+ if (unseenJobIDs.size() == 0) {
+ throw new RuntimeException("Expected at least one unseen jobID "
+ + " in this call to getJobIdsForGroup, but got "
+ + unseenJobIDs.size());
+ }
+
+ seenJobIDs.addAll(unseenJobIDs);
+ return unseenJobIDs;
+ }
+
+ private void cleanUpSparkJob() {
+ LOG.info("clean up Spark Job");
+ boolean isLocal = System.getenv("SPARK_MASTER") != null ? System
+ .getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true;
+ if (isLocal) {
+ String shipFiles = pigContext.getProperties().getProperty(
+ "pig.streaming.ship.files");
+ if (shipFiles != null) {
+ for (String file : shipFiles.split(",")) {
+ File shipFile = new File(file);
+ File deleteFile = new File(currentDirectoryPath + "/"
+ + shipFile.getName());
+ if (deleteFile.exists()) {
+ LOG.info(String.format("delete ship file result: %b",
+ deleteFile.delete()));
+ }
+ }
+ }
+ String cacheFiles = pigContext.getProperties().getProperty(
+ "pig.streaming.cache.files");
+ if (cacheFiles != null) {
+ for (String file : cacheFiles.split(",")) {
+ String fileName = extractFileName(file.trim());
+ File deleteFile = new File(currentDirectoryPath + "/"
+ + fileName);
+ if (deleteFile.exists()) {
+ LOG.info(String.format("delete cache file result: %b",
+ deleteFile.delete()));
+ }
+ }
+ }
+ }
+ }
+
+ private void addFilesToSparkJob() throws IOException {
+ LOG.info("add Files Spark Job");
+ String shipFiles = pigContext.getProperties().getProperty(
+ "pig.streaming.ship.files");
+ shipFiles(shipFiles);
+ String cacheFiles = pigContext.getProperties().getProperty(
+ "pig.streaming.cache.files");
+ cacheFiles(cacheFiles);
+ }
+
+
+ private void shipFiles(String shipFiles)
+ throws IOException {
+ if (shipFiles != null) {
+ for (String file : shipFiles.split(",")) {
+ File shipFile = new File(file.trim());
+ if (shipFile.exists()) {
+ LOG.info(String.format("shipFile:%s", shipFile));
+ addJarToSparkJobWorkingDirectory(shipFile,
shipFile.getName());
+ }
+ }
+ }
+ }
+
+ private void cacheFiles(String cacheFiles) throws IOException {
+ if (cacheFiles != null) {
+ Configuration conf = SparkUtil.newJobConf(pigContext);
+ for (String file : cacheFiles.split(",")) {
+ String fileName = extractFileName(file.trim());
+ Path src = new Path(extractFileUrl(file.trim()));
+ File tmpFile = File.createTempFile(fileName, ".tmp");
+ Path tmpFilePath = new Path(tmpFile.getAbsolutePath());
+ FileSystem fs = tmpFilePath.getFileSystem(conf);
+ fs.copyToLocalFile(src, tmpFilePath);
+ tmpFile.deleteOnExit();
LOG.info(String.format("cacheFile:%s", fileName));
- addJarToSparkJobWorkingDirectory(tmpFile, fileName);
- }
- }
- }
+ addJarToSparkJobWorkingDirectory(tmpFile, fileName);
+ }
+ }
+ }
private void addJarToSparkJobWorkingDirectory(File jarFile, String
jarName) throws IOException {
- LOG.info("Added jar "+jarName);
+ LOG.info("Added jar " + jarName);
boolean isLocal = System.getenv("SPARK_MASTER") != null ? System
.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true;
if (isLocal) {
File localFile = new File(currentDirectoryPath + "/"
+ jarName);
- if (jarFile.getAbsolutePath().equals(localFile.getAbsolutePath())
+ if (jarFile.getAbsolutePath().equals(localFile.getAbsolutePath())
&& jarFile.exists()) {
return;
}
@@ -339,119 +338,119 @@ public class SparkLauncher extends Launc
}
}
- private String extractFileName(String cacheFileUrl) {
- String[] tmpAry = cacheFileUrl.split("#");
- String fileName = tmpAry != null && tmpAry.length == 2 ?
tmpAry[1]
- : null;
- if (fileName == null) {
- throw new RuntimeException("cache file is invalid
format, file:"
- + cacheFileUrl);
- } else {
- LOG.debug("cache file name is valid:" + cacheFileUrl);
- return fileName;
- }
- }
-
- private String extractFileUrl(String cacheFileUrl) {
- String[] tmpAry = cacheFileUrl.split("#");
- String fileName = tmpAry != null && tmpAry.length == 2 ?
tmpAry[0]
- : null;
- if (fileName == null) {
- throw new RuntimeException("cache file is invalid
format, file:"
- + cacheFileUrl);
- } else {
- LOG.debug("cache file name is valid:" + cacheFileUrl);
- return fileName;
- }
- }
-
- private SparkOperPlan compile(PhysicalPlan physicalPlan,
- PigContext pigContext) throws PlanException,
IOException,
- VisitorException {
- SparkCompiler sparkCompiler = new SparkCompiler(physicalPlan,
- pigContext);
- sparkCompiler.compile();
- SparkOperPlan sparkPlan = sparkCompiler.getSparkPlan();
-
- // optimize key - value handling in package
- SparkPOPackageAnnotator pkgAnnotator = new
SparkPOPackageAnnotator(
- sparkPlan);
- pkgAnnotator.visit();
-
- optimize(pigContext, sparkPlan);
- return sparkPlan;
- }
-
- private static void startSparkIfNeeded(PigContext pc) throws
PigException {
- if (sparkContext == null) {
- String master = null;
- if (pc.getExecType().isLocal()) {
- master = "local";
- } else {
- master = System.getenv("SPARK_MASTER");
- if (master == null) {
- LOG.info("SPARK_MASTER not specified,
using \"local\"");
- master = "local";
- }
- }
-
- String sparkHome = System.getenv("SPARK_HOME");
- String sparkJarsSetting = System.getenv("SPARK_JARS");
- String pigJar = System.getenv("SPARK_PIG_JAR");
- String[] sparkJars = sparkJarsSetting == null ? new
String[] {}
- : sparkJarsSetting.split(",");
- List<String> jars = Lists.asList(pigJar, sparkJars);
-
- if (!master.startsWith("local") &&
!master.equals("yarn-client")) {
- // Check that we have the Mesos native library
and Spark home
- // are set
- if (sparkHome == null) {
- System.err
- .println("You need to
set SPARK_HOME to run on a Mesos cluster!");
- throw new PigException("SPARK_HOME is
not set");
- }
- }
-
- sparkContext = new JavaSparkContext(master,
"PigOnSpark", sparkHome,
- jars.toArray(new String[jars.size()]));
- sparkContext.sc().addSparkListener(new
StatsReportListener());
- sparkContext.sc().addSparkListener(new JobLogger());
- sparkContext.sc().addSparkListener(jobMetricsListener);
- }
- }
-
- // You can use this in unit tests to stop the SparkContext between
tests.
- static void stopSpark() {
- if (sparkContext != null) {
- sparkContext.stop();
- sparkContext = null;
- }
- }
-
- private void sparkPlanToRDD(SparkOperPlan sparkPlan,
- Map<Class<? extends PhysicalOperator>, RDDConverter>
convertMap,
- SparkPigStats sparkStats, JobConf jobConf)
- throws IOException, InterruptedException {
- Set<Integer> seenJobIDs = new HashSet<Integer>();
- if (sparkPlan == null) {
- throw new RuntimeException("SparkPlan is null.");
- }
-
- List<SparkOperator> leaves = sparkPlan.getLeaves();
- Collections.sort(leaves);
- Map<OperatorKey, RDD<Tuple>> sparkOpToRdds = new HashMap();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Converting " + leaves.size() + " Spark
Operators to RDDs");
- }
-
- for (SparkOperator leaf : leaves) {
- new PhyPlanSetter(leaf.physicalPlan).visit();
- Map<OperatorKey, RDD<Tuple>> physicalOpToRdds = new
HashMap();
- sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
- physicalOpToRdds, convertMap,
seenJobIDs, sparkStats,
- jobConf);
- }
- }
+ private String extractFileName(String cacheFileUrl) {
+ String[] tmpAry = cacheFileUrl.split("#");
+ String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[1]
+ : null;
+ if (fileName == null) {
+ throw new RuntimeException("cache file is invalid format, file:"
+ + cacheFileUrl);
+ } else {
+ LOG.debug("cache file name is valid:" + cacheFileUrl);
+ return fileName;
+ }
+ }
+
+ private String extractFileUrl(String cacheFileUrl) {
+ String[] tmpAry = cacheFileUrl.split("#");
+ String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[0]
+ : null;
+ if (fileName == null) {
+ throw new RuntimeException("cache file is invalid format, file:"
+ + cacheFileUrl);
+ } else {
+ LOG.debug("cache file name is valid:" + cacheFileUrl);
+ return fileName;
+ }
+ }
+
+ private SparkOperPlan compile(PhysicalPlan physicalPlan,
+ PigContext pigContext) throws PlanException,
IOException,
+ VisitorException {
+ SparkCompiler sparkCompiler = new SparkCompiler(physicalPlan,
+ pigContext);
+ sparkCompiler.compile();
+ SparkOperPlan sparkPlan = sparkCompiler.getSparkPlan();
+
+ // optimize key - value handling in package
+ SparkPOPackageAnnotator pkgAnnotator = new SparkPOPackageAnnotator(
+ sparkPlan);
+ pkgAnnotator.visit();
+
+ optimize(pigContext, sparkPlan);
+ return sparkPlan;
+ }
+
+ private static void startSparkIfNeeded(PigContext pc) throws PigException {
+ if (sparkContext == null) {
+ String master = null;
+ if (pc.getExecType().isLocal()) {
+ master = "local";
+ } else {
+ master = System.getenv("SPARK_MASTER");
+ if (master == null) {
+ LOG.info("SPARK_MASTER not specified, using \"local\"");
+ master = "local";
+ }
+ }
+
+ String sparkHome = System.getenv("SPARK_HOME");
+ String sparkJarsSetting = System.getenv("SPARK_JARS");
+ String pigJar = System.getenv("SPARK_PIG_JAR");
+ String[] sparkJars = sparkJarsSetting == null ? new String[]{}
+ : sparkJarsSetting.split(",");
+ List<String> jars = Lists.asList(pigJar, sparkJars);
+
+ if (!master.startsWith("local") && !master.equals("yarn-client")) {
+ // Check that we have the Mesos native library and Spark home
+ // are set
+ if (sparkHome == null) {
+ System.err
+ .println("You need to set SPARK_HOME to run on a
Mesos cluster!");
+ throw new PigException("SPARK_HOME is not set");
+ }
+ }
+
+ sparkContext = new JavaSparkContext(master, "PigOnSpark",
sparkHome,
+ jars.toArray(new String[jars.size()]));
+ sparkContext.sc().addSparkListener(new StatsReportListener());
+ sparkContext.sc().addSparkListener(new JobLogger());
+ sparkContext.sc().addSparkListener(jobMetricsListener);
+ }
+ }
+
+ // You can use this in unit tests to stop the SparkContext between tests.
+ static void stopSpark() {
+ if (sparkContext != null) {
+ sparkContext.stop();
+ sparkContext = null;
+ }
+ }
+
+ private void sparkPlanToRDD(SparkOperPlan sparkPlan,
+ Map<Class<? extends PhysicalOperator>,
RDDConverter> convertMap,
+ SparkPigStats sparkStats, JobConf jobConf)
+ throws IOException, InterruptedException {
+ Set<Integer> seenJobIDs = new HashSet<Integer>();
+ if (sparkPlan == null) {
+ throw new RuntimeException("SparkPlan is null.");
+ }
+
+ List<SparkOperator> leaves = sparkPlan.getLeaves();
+ Collections.sort(leaves);
+ Map<OperatorKey, RDD<Tuple>> sparkOpToRdds = new HashMap();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Converting " + leaves.size() + " Spark Operators to
RDDs");
+ }
+
+ for (SparkOperator leaf : leaves) {
+ new PhyPlanSetter(leaf.physicalPlan).visit();
+ Map<OperatorKey, RDD<Tuple>> physicalOpToRdds = new HashMap();
+ sparkOperToRDD(sparkPlan, leaf, sparkOpToRdds,
+ physicalOpToRdds, convertMap, seenJobIDs, sparkStats,
+ jobConf);
+ }
+ }
private void addUDFJarsToSparkJobWorkingDirectory(SparkOperator leaf)
throws IOException {
@@ -459,7 +458,7 @@ public class SparkLauncher extends Launc
Class clazz = pigContext.getClassForAlias(udf);
if (clazz != null) {
String jar = JarManager.findContainingJar(clazz);
- if( jar != null) {
+ if (jar != null) {
File jarFile = new File(jar);
addJarToSparkJobWorkingDirectory(jarFile,
jarFile.getName());
}
@@ -468,60 +467,60 @@ public class SparkLauncher extends Launc
}
private void sparkOperToRDD(SparkOperPlan sparkPlan,
- SparkOperator sparkOperator,
- Map<OperatorKey, RDD<Tuple>> sparkOpRdds,
- Map<OperatorKey, RDD<Tuple>> physicalOpRdds,
- Map<Class<? extends PhysicalOperator>, RDDConverter>
convertMap,
- Set<Integer> seenJobIDs, SparkPigStats sparkStats,
JobConf conf)
- throws IOException, InterruptedException {
+ SparkOperator sparkOperator,
+ Map<OperatorKey, RDD<Tuple>> sparkOpRdds,
+ Map<OperatorKey, RDD<Tuple>> physicalOpRdds,
+ Map<Class<? extends PhysicalOperator>,
RDDConverter> convertMap,
+ Set<Integer> seenJobIDs, SparkPigStats
sparkStats, JobConf conf)
+ throws IOException, InterruptedException {
addUDFJarsToSparkJobWorkingDirectory(sparkOperator);
- List<SparkOperator> predecessors = sparkPlan
- .getPredecessors(sparkOperator);
- List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
- if (predecessors != null) {
- for (SparkOperator pred : predecessors) {
- if (sparkOpRdds.get(pred.getOperatorKey()) ==
null) {
- sparkOperToRDD(sparkPlan, pred,
sparkOpRdds,
- physicalOpRdds,
convertMap, seenJobIDs, sparkStats,
- conf);
- }
-
predecessorRDDs.add(sparkOpRdds.get(pred.getOperatorKey()));
- }
- }
-
- List<PhysicalOperator> leafPOs =
sparkOperator.physicalPlan.getLeaves();
- boolean isFail = false;
- Exception exception = null;
- if (leafPOs != null && leafPOs.size() != 1) {
- throw new IllegalArgumentException(
- String.format(
- "sparkOperator "
- +
".physicalPlan should have 1 leaf, but sparkOperator"
- +
".physicalPlan.getLeaves():{} not equals 1, sparkOperator"
- +
"sparkOperator:{}",
-
sparkOperator.physicalPlan.getLeaves().size(),
- sparkOperator.name()));
- }
-
- PhysicalOperator leafPO = leafPOs.get(0);
- try {
- physicalToRDD(sparkOperator.physicalPlan, leafPO,
physicalOpRdds,
- predecessorRDDs, convertMap);
- sparkOpRdds.put(sparkOperator.getOperatorKey(),
-
physicalOpRdds.get(leafPO.getOperatorKey()));
- } catch(Exception e) {
- if( e instanceof SparkException) {
- LOG.info("throw SparkException, error founds
when running " +
- "rdds in spark");
- }
- exception = e;
- isFail = true;
- }
-
- List<POStore> poStores = PlanHelper.getPhysicalOperators(
- sparkOperator.physicalPlan, POStore.class);
- if (poStores != null && poStores.size() == 1) {
- POStore poStore = poStores.get(0);
+ List<SparkOperator> predecessors = sparkPlan
+ .getPredecessors(sparkOperator);
+ List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
+ if (predecessors != null) {
+ for (SparkOperator pred : predecessors) {
+ if (sparkOpRdds.get(pred.getOperatorKey()) == null) {
+ sparkOperToRDD(sparkPlan, pred, sparkOpRdds,
+ physicalOpRdds, convertMap, seenJobIDs, sparkStats,
+ conf);
+ }
+ predecessorRDDs.add(sparkOpRdds.get(pred.getOperatorKey()));
+ }
+ }
+
+ List<PhysicalOperator> leafPOs =
sparkOperator.physicalPlan.getLeaves();
+ boolean isFail = false;
+ Exception exception = null;
+ if (leafPOs != null && leafPOs.size() != 1) {
+ throw new IllegalArgumentException(
+ String.format(
+ "sparkOperator "
+ + ".physicalPlan should have 1 leaf, but
sparkOperator"
+ + ".physicalPlan.getLeaves():{} not equals
1, sparkOperator"
+ + "sparkOperator:{}",
+ sparkOperator.physicalPlan.getLeaves().size(),
+ sparkOperator.name()));
+ }
+
+ PhysicalOperator leafPO = leafPOs.get(0);
+ try {
+ physicalToRDD(sparkOperator.physicalPlan, leafPO, physicalOpRdds,
+ predecessorRDDs, convertMap);
+ sparkOpRdds.put(sparkOperator.getOperatorKey(),
+ physicalOpRdds.get(leafPO.getOperatorKey()));
+ } catch (Exception e) {
+ if (e instanceof SparkException) {
+ LOG.info("throw SparkException, error founds when running " +
+ "rdds in spark");
+ }
+ exception = e;
+ isFail = true;
+ }
+
+ List<POStore> poStores = PlanHelper.getPhysicalOperators(
+ sparkOperator.physicalPlan, POStore.class);
+ if (poStores != null && poStores.size() == 1) {
+ POStore poStore = poStores.get(0);
if (!isFail) {
for (int jobID : getJobIDs(seenJobIDs)) {
SparkStatsUtil.waitForJobAddStats(jobID, poStore,
sparkOperator,
@@ -533,19 +532,19 @@ public class SparkLauncher extends Launc
conf, exception);
}
} else {
- LOG.info(String
- .format(String.format("sparkOperator:{}
does not have POStore or" +
- "
sparkOperator has more than 1 POStore. {} is the size of POStore."),
- sparkOperator.name(),
poStores.size()));
- }
- }
-
- private void physicalToRDD(PhysicalPlan plan,
- PhysicalOperator physicalOperator,
- Map<OperatorKey, RDD<Tuple>> rdds,
- List<RDD<Tuple>> rddsFromPredeSparkOper,
- Map<Class<? extends PhysicalOperator>, RDDConverter>
convertMap)
- throws IOException {
+ LOG.info(String
+ .format(String.format("sparkOperator:{} does not have
POStore or" +
+ " sparkOperator has more than 1 POStore.
{} is the size of POStore."),
+ sparkOperator.name(), poStores.size()));
+ }
+ }
+
+ private void physicalToRDD(PhysicalPlan plan,
+ PhysicalOperator physicalOperator,
+ Map<OperatorKey, RDD<Tuple>> rdds,
+ List<RDD<Tuple>> rddsFromPredeSparkOper,
+ Map<Class<? extends PhysicalOperator>,
RDDConverter> convertMap)
+ throws IOException {
RDD<Tuple> nextRDD = null;
List<PhysicalOperator> predecessors = plan
.getPredecessors(physicalOperator);
@@ -554,88 +553,88 @@ public class SparkLauncher extends Launc
}
List<RDD<Tuple>> predecessorRdds = Lists.newArrayList();
- if (predecessors != null) {
- for (PhysicalOperator predecessor : predecessors) {
- physicalToRDD(plan, predecessor, rdds,
rddsFromPredeSparkOper,
- convertMap);
-
predecessorRdds.add(rdds.get(predecessor.getOperatorKey()));
- }
-
- } else {
- if (rddsFromPredeSparkOper != null
- && rddsFromPredeSparkOper.size() > 0) {
- predecessorRdds.addAll(rddsFromPredeSparkOper);
- }
- }
-
- RDDConverter converter =
convertMap.get(physicalOperator.getClass());
- if (converter == null) {
- throw new IllegalArgumentException(
- "Pig on Spark does not support Physical
Operator: " + physicalOperator);
- }
-
- LOG.info("Converting operator "
- + physicalOperator.getClass().getSimpleName() +
" "
- + physicalOperator);
- nextRDD = converter.convert(predecessorRdds, physicalOperator);
-
- if (nextRDD == null) {
- throw new IllegalArgumentException(
- "RDD should not be null after
PhysicalOperator: "
- + physicalOperator);
- }
-
- rdds.put(physicalOperator.getOperatorKey(), nextRDD);
- }
-
- @Override
- public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
- String format, boolean verbose) throws IOException {
- SparkOperPlan sparkPlan = compile(pp, pc);
- explain(sparkPlan, ps, format, verbose);
- }
-
- private void explain(SparkOperPlan sparkPlan, PrintStream ps,
- String format, boolean verbose)
- throws IOException {
- Map<OperatorKey, SparkOperator> allOperKeys =
sparkPlan.getKeys();
- List<OperatorKey> operKeyList = new
ArrayList(allOperKeys.keySet());
- Collections.sort(operKeyList);
- for (OperatorKey operatorKey : operKeyList) {
- SparkOperator op = sparkPlan.getOperator(operatorKey);
- ps.print(op.getOperatorKey());
- List<SparkOperator> successors =
sparkPlan.getSuccessors(op);
- if (successors != null) {
- ps.print("->");
- for (SparkOperator suc : successors) {
- ps.print(suc.getOperatorKey() + " ");
- }
- }
- ps.println();
- }
-
- if (format.equals("text")) {
- SparkPrinter printer = new SparkPrinter(ps, sparkPlan);
- printer.setVerbose(verbose);
- printer.visit();
- } else { // TODO: add support for other file format
- throw new IOException(
- "Non-text output of explain is not
supported.");
- }
- }
-
- @Override
- public void kill() throws BackendException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void killJob(String jobID, Configuration conf)
- throws BackendException {
- // TODO Auto-generated method stub
+ if (predecessors != null) {
+ for (PhysicalOperator predecessor : predecessors) {
+ physicalToRDD(plan, predecessor, rdds, rddsFromPredeSparkOper,
+ convertMap);
+ predecessorRdds.add(rdds.get(predecessor.getOperatorKey()));
+ }
- }
+ } else {
+ if (rddsFromPredeSparkOper != null
+ && rddsFromPredeSparkOper.size() > 0) {
+ predecessorRdds.addAll(rddsFromPredeSparkOper);
+ }
+ }
+
+ RDDConverter converter = convertMap.get(physicalOperator.getClass());
+ if (converter == null) {
+ throw new IllegalArgumentException(
+ "Pig on Spark does not support Physical Operator: " +
physicalOperator);
+ }
+
+ LOG.info("Converting operator "
+ + physicalOperator.getClass().getSimpleName() + " "
+ + physicalOperator);
+ nextRDD = converter.convert(predecessorRdds, physicalOperator);
+
+ if (nextRDD == null) {
+ throw new IllegalArgumentException(
+ "RDD should not be null after PhysicalOperator: "
+ + physicalOperator);
+ }
+
+ rdds.put(physicalOperator.getOperatorKey(), nextRDD);
+ }
+
+ @Override
+ public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
+ String format, boolean verbose) throws IOException {
+ SparkOperPlan sparkPlan = compile(pp, pc);
+ explain(sparkPlan, ps, format, verbose);
+ }
+
+ private void explain(SparkOperPlan sparkPlan, PrintStream ps,
+ String format, boolean verbose)
+ throws IOException {
+ Map<OperatorKey, SparkOperator> allOperKeys = sparkPlan.getKeys();
+ List<OperatorKey> operKeyList = new ArrayList(allOperKeys.keySet());
+ Collections.sort(operKeyList);
+ for (OperatorKey operatorKey : operKeyList) {
+ SparkOperator op = sparkPlan.getOperator(operatorKey);
+ ps.print(op.getOperatorKey());
+ List<SparkOperator> successors = sparkPlan.getSuccessors(op);
+ if (successors != null) {
+ ps.print("->");
+ for (SparkOperator suc : successors) {
+ ps.print(suc.getOperatorKey() + " ");
+ }
+ }
+ ps.println();
+ }
+
+ if (format.equals("text")) {
+ SparkPrinter printer = new SparkPrinter(ps, sparkPlan);
+ printer.setVerbose(verbose);
+ printer.visit();
+ } else { // TODO: add support for other file format
+ throw new IOException(
+ "Non-text output of explain is not supported.");
+ }
+ }
+
+ @Override
+ public void kill() throws BackendException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void killJob(String jobID, Configuration conf)
+ throws BackendException {
+ // TODO Auto-generated method stub
+
+ }
/**
* We store the value of udf.import.list in
PigContext#properties.getProperty("spark.udf.import.list") in spark mode.
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java?rev=1689010&r1=1689009&r2=1689010&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java
Fri Jul 3 12:48:05 2015
@@ -25,25 +25,25 @@ import java.util.Properties;
*/
public class SparkLocalExecType extends SparkExecType {
- private static final long serialVersionUID = 1L;
- private static final String mode ="SPARK_LOCAL";
+ private static final long serialVersionUID = 1L;
+ private static final String mode = "SPARK_LOCAL";
- @Override
- public boolean accepts(Properties properties) {
- String execTypeSpecified = properties.getProperty("exectype", "")
- .toUpperCase();
- if (execTypeSpecified.equals(mode))
- return true;
- return false;
- }
+ @Override
+ public boolean accepts(Properties properties) {
+ String execTypeSpecified = properties.getProperty("exectype", "")
+ .toUpperCase();
+ if (execTypeSpecified.equals(mode))
+ return true;
+ return false;
+ }
- @Override
- public boolean isLocal() {
- return true;
- }
+ @Override
+ public boolean isLocal() {
+ return true;
+ }
- @Override
- public String name() {
- return "SPARK_LOCAL";
- }
+ @Override
+ public String name() {
+ return "SPARK_LOCAL";
+ }
}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java?rev=1689010&r1=1689009&r2=1689010&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
Fri Jul 3 12:48:05 2015
@@ -29,60 +29,60 @@ import org.apache.pig.data.Tuple;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
-@SuppressWarnings({ "serial"})
+@SuppressWarnings({"serial"})
public class CollectedGroupConverter implements RDDConverter<Tuple, Tuple,
POCollectedGroup> {
- @Override
- public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
- POCollectedGroup physicalOperator) throws IOException {
- SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
- RDD<Tuple> rdd = predecessors.get(0);
- CollectedGroupFunction collectedGroupFunction
- = new CollectedGroupFunction(physicalOperator);
- return rdd.toJavaRDD().mapPartitions(collectedGroupFunction, true)
- .rdd();
- }
-
- private static class CollectedGroupFunction
- implements FlatMapFunction<Iterator<Tuple>, Tuple> {
-
- private POCollectedGroup poCollectedGroup;
-
- public long current_val;
- public boolean proceed;
-
- private CollectedGroupFunction(POCollectedGroup
poCollectedGroup) {
- this.poCollectedGroup = poCollectedGroup;
- this.current_val = 0;
- }
-
- public Iterable<Tuple> call(final Iterator<Tuple> input) {
-
- return new Iterable<Tuple>() {
-
- @Override
- public Iterator<Tuple> iterator() {
-
- return new OutputConsumerIterator(input) {
-
- @Override
- protected void attach(Tuple tuple) {
-
poCollectedGroup.setInputs(null);
-
poCollectedGroup.attachInput(tuple);
- }
-
- @Override
- protected Result getNextResult()
throws ExecException {
- return
poCollectedGroup.getNextTuple();
- }
-
- @Override
- protected void endOfInput() {
-
poCollectedGroup.getParentPlan().endOfAllInput = true;
- }
- };
- }
- };
- }
- }
+ @Override
+ public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
+ POCollectedGroup physicalOperator) throws
IOException {
+ SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
+ RDD<Tuple> rdd = predecessors.get(0);
+ CollectedGroupFunction collectedGroupFunction
+ = new CollectedGroupFunction(physicalOperator);
+ return rdd.toJavaRDD().mapPartitions(collectedGroupFunction, true)
+ .rdd();
+ }
+
+ private static class CollectedGroupFunction
+ implements FlatMapFunction<Iterator<Tuple>, Tuple> {
+
+ private POCollectedGroup poCollectedGroup;
+
+ public long current_val;
+ public boolean proceed;
+
+ private CollectedGroupFunction(POCollectedGroup poCollectedGroup) {
+ this.poCollectedGroup = poCollectedGroup;
+ this.current_val = 0;
+ }
+
+ public Iterable<Tuple> call(final Iterator<Tuple> input) {
+
+ return new Iterable<Tuple>() {
+
+ @Override
+ public Iterator<Tuple> iterator() {
+
+ return new OutputConsumerIterator(input) {
+
+ @Override
+ protected void attach(Tuple tuple) {
+ poCollectedGroup.setInputs(null);
+ poCollectedGroup.attachInput(tuple);
+ }
+
+ @Override
+ protected Result getNextResult() throws ExecException {
+ return poCollectedGroup.getNextTuple();
+ }
+
+ @Override
+ protected void endOfInput() {
+ poCollectedGroup.getParentPlan().endOfAllInput =
true;
+ }
+ };
+ }
+ };
+ }
+ }
}