Author: xuefu
Date: Thu Jun 18 04:08:23 2015
New Revision: 1686137
URL: http://svn.apache.org/r1686137
Log:
PIG-4606: Enable TestDefaultDateTimeZone unit tests in spark mode (Liyun via
Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1686137&r1=1686136&r2=1686137&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Thu Jun 18 04:08:23 2015
@@ -89,6 +89,7 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.spark.SparkPigStats;
import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
@@ -112,6 +113,7 @@ public class SparkLauncher extends Launc
private static JobMetricsListener jobMetricsListener = new
JobMetricsListener();
private String jobGroupID;
private PigContext pigContext = null;
+ private JobConf jobConf = null;
private String currentDirectoryPath = null;
@Override
@@ -120,12 +122,7 @@ public class SparkLauncher extends Launc
if (LOG.isDebugEnabled())
LOG.debug(physicalPlan);
this.pigContext = pigContext;
- saveUdfImportList(pigContext);
- JobConf jobConf = SparkUtil.newJobConf(pigContext);
- jobConf.set(PigConstants.LOCAL_CODE_DIR,
- System.getProperty("java.io.tmpdir"));
-
- SchemaTupleBackend.initialize(jobConf, pigContext);
+ initialize();
SparkOperPlan sparkplan = compile(physicalPlan, pigContext);
if (LOG.isDebugEnabled())
explain(sparkplan, System.out, "text", true);
@@ -146,7 +143,7 @@ public class SparkLauncher extends Launc
this.currentDirectoryPath = Paths.get(".").toAbsolutePath()
.normalize().toString()
+ "/";
- startSparkJob();
+ addFilesToSparkJob();
LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
physicalPlan, POStore.class);
POStore firstStore = stores.getFirst();
@@ -272,15 +269,14 @@ public class SparkLauncher extends Launc
}
}
- private void startSparkJob() throws IOException {
- LOG.info("start Spark Job");
+ private void addFilesToSparkJob() throws IOException {
+ LOG.info("add Files Spark Job");
String shipFiles = pigContext.getProperties().getProperty(
"pig.streaming.ship.files");
shipFiles(shipFiles);
String cacheFiles = pigContext.getProperties().getProperty(
"pig.streaming.cache.files");
cacheFiles(cacheFiles);
-
}
@@ -645,10 +641,19 @@ public class SparkLauncher extends Launc
* Later we will use
PigContext#properties.getProperty("spark.udf.import.list")in
PigContext#writeObject.
* we don't save this value in
PigContext#properties.getProperty("udf.import.list")
* because this will cause OOM problem(detailed see PIG-4295).
- * @param pigContext
*/
- private void saveUdfImportList(PigContext pigContext) {
+ private void saveUdfImportList() {
String udfImportList =
Joiner.on(",").join(PigContext.getPackageImportList());
pigContext.getProperties().setProperty("spark.udf.import.list",
udfImportList);
}
+
+ private void initialize() throws IOException {
+ saveUdfImportList();
+ jobConf = SparkUtil.newJobConf(pigContext);
+ jobConf.set(PigConstants.LOCAL_CODE_DIR,
+ System.getProperty("java.io.tmpdir"));
+
+ SchemaTupleBackend.initialize(jobConf, pigContext);
+ Utils.setDefaultTimeZone(jobConf);
+ }
}