Author: xuefu
Date: Thu Jun 4 05:46:07 2015
New Revision: 1683465
URL: http://svn.apache.org/r1683465
Log:
Enable unit test TestGrunt for Spark (Liyun via Xuefu)
Modified:
pig/branches/spark/shims/test/hadoop23/org/apache/pig/test/SparkMiniCluster.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
Modified:
pig/branches/spark/shims/test/hadoop23/org/apache/pig/test/SparkMiniCluster.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/shims/test/hadoop23/org/apache/pig/test/SparkMiniCluster.java?rev=1683465&r1=1683464&r2=1683465&view=diff
==============================================================================
---
pig/branches/spark/shims/test/hadoop23/org/apache/pig/test/SparkMiniCluster.java
(original)
+++
pig/branches/spark/shims/test/hadoop23/org/apache/pig/test/SparkMiniCluster.java
Thu Jun 4 05:46:07 2015
@@ -3,15 +3,32 @@ package org.apache.pig.test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.Map;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.pig.ExecType;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkExecType;
public class SparkMiniCluster extends MiniGenericCluster {
private static final File CONF_DIR = new File("build/classes");
- private static final File CONF_FILE = new File(CONF_DIR,
"hadoop-site.xml");
+ private static final File CORE_CONF_FILE = new File(CONF_DIR,
"core-site.xml");
+ private static final File HDFS_CONF_FILE = new File(CONF_DIR,
"hdfs-site.xml");
+ private static final File MAPRED_CONF_FILE = new File(CONF_DIR,
"mapred-site.xml");
+ private static final File YARN_CONF_FILE = new File(CONF_DIR,
"yarn-site.xml");
+
+ private Configuration m_dfs_conf = null;
+ protected MiniMRYarnCluster m_mr = null;
+ private Configuration m_mr_conf = null;
+
+ private static final Log LOG = LogFactory
+ .getLog(SparkMiniCluster.class);
private ExecType spark = new SparkExecType();
SparkMiniCluster() {
@@ -26,28 +43,86 @@ public class SparkMiniCluster extends Mi
protected void setupMiniDfsAndMrClusters() {
try {
CONF_DIR.mkdirs();
- if (CONF_FILE.exists()) {
- CONF_FILE.delete();
- }
- m_conf = new Configuration();
- m_conf.set("io.sort.mb", "1");
- m_conf.writeXml(new FileOutputStream(CONF_FILE));
- int dataNodes = 4;
- m_dfs = new MiniDFSCluster(m_conf, dataNodes, true, null);
+
+ // Build mini DFS cluster
+ Configuration hdfsConf = new Configuration();
+ m_dfs = new MiniDFSCluster.Builder(hdfsConf)
+ .numDataNodes(2)
+ .format(true)
+ .racks(null)
+ .build();
m_fileSys = m_dfs.getFileSystem();
+ m_dfs_conf = m_dfs.getConfiguration(0);
+
+ //Create user home directory
m_fileSys.mkdirs(m_fileSys.getWorkingDirectory());
+ // Write core-site.xml
+ Configuration core_site = new Configuration(false);
+ core_site.set(FileSystem.FS_DEFAULT_NAME_KEY,
m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+ core_site.writeXml(new FileOutputStream(CORE_CONF_FILE));
+
+ Configuration hdfs_site = new Configuration(false);
+ for (Map.Entry<String, String> conf : m_dfs_conf) {
+ if
(ArrayUtils.contains(m_dfs_conf.getPropertySources(conf.getKey()),
"programatically")) {
+ hdfs_site.set(conf.getKey(),
m_dfs_conf.getRaw(conf.getKey()));
+ }
+ }
+ hdfs_site.writeXml(new FileOutputStream(HDFS_CONF_FILE));
+
+ // Build mini YARN cluster
+ m_mr = new MiniMRYarnCluster("PigMiniCluster", 2);
+ m_mr.init(m_dfs_conf);
+ m_mr.start();
+ m_mr_conf = m_mr.getConfig();
+ m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ System.getProperty("java.class.path"));
+
+ Configuration mapred_site = new Configuration(false);
+ Configuration yarn_site = new Configuration(false);
+ for (Map.Entry<String, String> conf : m_mr_conf) {
+ if
(ArrayUtils.contains(m_mr_conf.getPropertySources(conf.getKey()),
"programatically")) {
+ if (conf.getKey().contains("yarn")) {
+ yarn_site.set(conf.getKey(),
m_mr_conf.getRaw(conf.getKey()));
+ } else if (!conf.getKey().startsWith("dfs")){
+ mapred_site.set(conf.getKey(),
m_mr_conf.getRaw(conf.getKey()));
+ }
+ }
+ }
+
+ mapred_site.writeXml(new FileOutputStream(MAPRED_CONF_FILE));
+ yarn_site.writeXml(new FileOutputStream(YARN_CONF_FILE));
+
+ m_conf = m_mr_conf;
+ System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
+ System.setProperty("hadoop.log.dir", "build/test/logs");
} catch (IOException e) {
throw new RuntimeException(e);
}
-
}
@Override
protected void shutdownMiniMrClusters() {
- if (CONF_FILE.exists()) {
- CONF_FILE.delete();
+ deleteConfFiles();
+ if (m_mr != null) {
+ m_mr.stop();
+ m_mr = null;
}
}
+ private void deleteConfFiles() {
+
+ if(CORE_CONF_FILE.exists()) {
+ CORE_CONF_FILE.delete();
+ }
+ if(HDFS_CONF_FILE.exists()) {
+ HDFS_CONF_FILE.delete();
+ }
+ if(MAPRED_CONF_FILE.exists()) {
+ MAPRED_CONF_FILE.delete();
+ }
+ if(YARN_CONF_FILE.exists()) {
+ YARN_CONF_FILE.delete();
+ }
+ }
}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1683465&r1=1683464&r2=1683465&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Thu Jun 4 05:46:07 2015
@@ -107,6 +107,7 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.JarManager;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.spark.SparkPigStats;
import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
@@ -129,12 +130,15 @@ public class SparkLauncher extends Launc
private static JavaSparkContext sparkContext = null;
private static JobMetricsListener jobMetricsListener = new
JobMetricsListener();
private String jobGroupID;
+ private PigContext pigContext = null;
+ private String currentDirectoryPath = null;
@Override
public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
PigContext pigContext) throws Exception {
if (LOG.isDebugEnabled())
LOG.debug(physicalPlan);
+ this.pigContext = pigContext;
saveUdfImportList(pigContext);
JobConf jobConf = SparkUtil.newJobConf(pigContext);
jobConf.set(PigConstants.LOCAL_CODE_DIR,
@@ -158,10 +162,10 @@ public class SparkLauncher extends Launc
false);
jobMetricsListener.reset();
- String currentDirectoryPath = Paths.get(".").toAbsolutePath()
+ this.currentDirectoryPath = Paths.get(".").toAbsolutePath()
.normalize().toString()
+ "/";
- startSparkJob(pigContext, currentDirectoryPath);
+ startSparkJob();
LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
physicalPlan, POStore.class);
POStore firstStore = stores.getFirst();
@@ -198,7 +202,7 @@ public class SparkLauncher extends Launc
convertMap.put(POFRJoin.class, new FRJoinConverter());
sparkPlanToRDD(sparkplan, convertMap, sparkStats, jobConf);
- cleanUpSparkJob(pigContext, currentDirectoryPath);
+ cleanUpSparkJob();
sparkStats.finish();
return sparkStats;
@@ -252,8 +256,7 @@ public class SparkLauncher extends Launc
return unseenJobIDs;
}
- private void cleanUpSparkJob(PigContext pigContext,
- String currentDirectoryPath) {
+ private void cleanUpSparkJob() {
LOG.info("clean up Spark Job");
boolean isLocal = System.getenv("SPARK_MASTER") != null ? System
.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true;
@@ -287,56 +290,34 @@ public class SparkLauncher extends Launc
}
}
- private void startSparkJob(PigContext pigContext,
- String currentDirectoryPath) throws IOException {
+ private void startSparkJob() throws IOException {
LOG.info("start Spark Job");
String shipFiles = pigContext.getProperties().getProperty(
"pig.streaming.ship.files");
- shipFiles(shipFiles, currentDirectoryPath);
+ shipFiles(shipFiles);
String cacheFiles = pigContext.getProperties().getProperty(
"pig.streaming.cache.files");
- cacheFiles(cacheFiles, currentDirectoryPath, pigContext);
+ cacheFiles(cacheFiles);
+
}
- private void shipFiles(String shipFiles, String currentDirectoryPath)
+
+ private void shipFiles(String shipFiles)
throws IOException {
if (shipFiles != null) {
for (String file : shipFiles.split(",")) {
File shipFile = new File(file.trim());
if (shipFile.exists()) {
LOG.info(String.format("shipFile:%s",
shipFile));
- boolean isLocal =
System.getenv("SPARK_MASTER") != null ? System
-
.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL")
- : true;
- if (isLocal) {
- File localFile = new
File(currentDirectoryPath + "/"
- +
shipFile.getName());
- if (localFile.exists()) {
- LOG.info(String.format(
- "ship
file %s exists, ready to delete",
-
localFile.getAbsolutePath()));
- localFile.delete();
- } else {
-
LOG.info(String.format("ship file %s not exists,",
-
localFile.getAbsolutePath()));
- }
- Files.copy(shipFile.toPath(),
-
Paths.get(localFile.getAbsolutePath()));
- } else {
-
sparkContext.addFile(shipFile.toURI().toURL()
-
.toExternalForm());
- }
+
addJarToSparkJobWorkingDirectory(shipFile,shipFile.getName());
}
}
}
}
- private void cacheFiles(String cacheFiles, String currentDirectoryPath,
- PigContext pigContext) throws IOException {
+ private void cacheFiles(String cacheFiles) throws IOException {
if (cacheFiles != null) {
Configuration conf = SparkUtil.newJobConf(pigContext);
- boolean isLocal = System.getenv("SPARK_MASTER") != null
? System
-
.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true;
for (String file : cacheFiles.split(",")) {
String fileName = extractFileName(file.trim());
Path src = new
Path(extractFileUrl(file.trim()));
@@ -345,28 +326,36 @@ public class SparkLauncher extends Launc
FileSystem fs = tmpFilePath.getFileSystem(conf);
fs.copyToLocalFile(src, tmpFilePath);
tmpFile.deleteOnExit();
- if (isLocal) {
- File localFile = new
File(currentDirectoryPath + "/"
- + fileName);
- if (localFile.exists()) {
- LOG.info(String.format(
- "cache file %s
exists, ready to delete",
-
localFile.getAbsolutePath()));
- localFile.delete();
- } else {
- LOG.info(String.format("cache
file %s not exists,",
-
localFile.getAbsolutePath()));
- }
-
Files.copy(Paths.get(tmpFilePath.toString()),
-
Paths.get(localFile.getAbsolutePath()));
- } else {
-
sparkContext.addFile(tmpFile.toURI().toURL()
- .toExternalForm());
- }
+ LOG.info(String.format("cacheFile:%s", fileName));
+ addJarToSparkJobWorkingDirectory(tmpFile, fileName);
}
}
}
+ private void addJarToSparkJobWorkingDirectory(File jarFile, String
jarName) throws IOException {
+ LOG.info("Added jar "+jarName);
+ boolean isLocal = System.getenv("SPARK_MASTER") != null ? System
+ .getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true;
+ if (isLocal) {
+ File localFile = new File(currentDirectoryPath + "/"
+ + jarName);
+ if (localFile.exists()) {
+ LOG.info(String.format(
+ "jar file %s exists, ready to delete",
+ localFile.getAbsolutePath()));
+ localFile.delete();
+ } else {
+ LOG.info(String.format("jar file %s not exists,",
+ localFile.getAbsolutePath()));
+ }
+ Files.copy(Paths.get(new
Path(jarFile.getAbsolutePath()).toString()),
+ Paths.get(localFile.getAbsolutePath()));
+ } else {
+ sparkContext.addFile(jarFile.toURI().toURL()
+ .toExternalForm());
+ }
+ }
+
private String extractFileName(String cacheFileUrl) {
String[] tmpAry = cacheFileUrl.split("#");
String fileName = tmpAry != null && tmpAry.length == 2 ?
tmpAry[1]
@@ -481,13 +470,28 @@ public class SparkLauncher extends Launc
}
}
- private void sparkOperToRDD(SparkOperPlan sparkPlan,
+ private void addUDFJarsToSparkJobWorkingDirectory(SparkOperator leaf)
throws IOException {
+
+ for (String udf : leaf.UDFs) {
+ Class clazz = pigContext.getClassForAlias(udf);
+ if (clazz != null) {
+ String jar = JarManager.findContainingJar(clazz);
+ if( jar != null) {
+ File jarFile = new File(jar);
+ addJarToSparkJobWorkingDirectory(jarFile,
jarFile.getName());
+ }
+ }
+ }
+ }
+
+ private void sparkOperToRDD(SparkOperPlan sparkPlan,
SparkOperator sparkOperator,
Map<OperatorKey, RDD<Tuple>> sparkOpRdds,
Map<OperatorKey, RDD<Tuple>> physicalOpRdds,
Map<Class<? extends PhysicalOperator>, POConverter>
convertMap,
Set<Integer> seenJobIDs, SparkPigStats sparkStats,
JobConf conf)
throws IOException, InterruptedException {
+ addUDFJarsToSparkJobWorkingDirectory(sparkOperator);
List<SparkOperator> predecessors = sparkPlan
.getPredecessors(sparkOperator);
List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1683465&r1=1683464&r2=1683465&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Thu Jun 4
05:46:07 2015
@@ -61,7 +61,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
public class TestGrunt {
-
static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
private String basedir = "test/org/apache/pig/test/data";
@@ -929,13 +928,21 @@ public class TestGrunt {
Grunt grunt = new Grunt(new BufferedReader(reader), context);
boolean caught = false;
- try {
- grunt.exec();
- } catch (Exception e) {
- caught = true;
- assertTrue(e.getMessage().contains("baz does not exist"));
+ // in mr mode, the output file 'baz' will be automatically deleted if
the mr job fails
+ // when "cat baz;" is executed, it throws "Encountered IOException.
Directory baz does not exist"
+ // in GruntParser#processCat() and variable "caught" is true
+ // in spark mode, the output file 'baz' will not be automatically
deleted even the job fails(see SPARK-7953)
+ // when "cat baz;" is executed, it does not throw exception and the
variable "caught" is false
+ // TODO: Enable this for Spark when SPARK-7953 is resolved
+ if(!Util.isSparkExecType(cluster.getExecType())) {
+ try {
+ grunt.exec();
+ } catch (Exception e) {
+ caught = true;
+ assertTrue(e.getMessage().contains("baz does not exist"));
+ }
+ assertTrue(caught);
}
- assertTrue(caught);
}
@Test
@@ -1473,7 +1480,7 @@ public class TestGrunt {
JavaCompilerHelper javaCompilerHelper = new JavaCompilerHelper();
javaCompilerHelper.compile(tmpDir.getAbsolutePath(),
new
JavaCompilerHelper.JavaSourceFromString("com.xxx.udf.TestUDF", udfSrc));
-
+
String jarName = "TestUDFJar.jar";
String jarFile = tmpDir.getAbsolutePath() + FILE_SEPARATOR + jarName;
int status = Util.executeJavaCommand("jar -cf " + jarFile +
@@ -1496,7 +1503,7 @@ public class TestGrunt {
boolean found = false;
for (String line : lines) {
if (line.matches(".*Added jar .*" + jarName + ".*")) {
- // MR mode
+ // MR and Spark mode
found = true;
} else if (line.matches(".*Local resource.*" + jarName + ".*")) {
// Tez mode