Author: xuefu
Date: Tue May 19 18:15:23 2015
New Revision: 1680363
URL: http://svn.apache.org/r1680363
Log:
PIG-4295: Enable unit test TestPigContext for spark (Liyun via Xuefu)
Modified:
pig/branches/spark/build.xml
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/impl/PigContext.java
Modified: pig/branches/spark/build.xml
URL:
http://svn.apache.org/viewvc/pig/branches/spark/build.xml?rev=1680363&r1=1680362&r2=1680363&view=diff
==============================================================================
--- pig/branches/spark/build.xml (original)
+++ pig/branches/spark/build.xml Tue May 19 18:15:23 2015
@@ -120,7 +120,7 @@
<!-- test configuration, use ${user.home}/build.properties to configure
values -->
<property name="ssh.gateway" value="" />
<property name="hod.server" value="" />
- <property name="test.output" value="yes"/>
+ <property name="test.output" value="no"/>
<!-- e2e test properties -->
<property name="test.e2e.dir" value="${basedir}/test/e2e/pig"/>
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1680363&r1=1680362&r2=1680363&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Tue May 19 18:15:23 2015
@@ -17,6 +17,7 @@
*/
package org.apache.pig.backend.hadoop.executionengine.spark;
+import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import java.io.File;
@@ -131,6 +132,7 @@ public class SparkLauncher extends Launc
PigContext pigContext) throws Exception {
if (LOG.isDebugEnabled())
LOG.debug(physicalPlan);
+ saveUdfImportList(pigContext);
JobConf jobConf = SparkUtil.newJobConf(pigContext);
jobConf.set(PigConstants.LOCAL_CODE_DIR,
System.getProperty("java.io.tmpdir"));
@@ -638,4 +640,16 @@ public class SparkLauncher extends Launc
// TODO Auto-generated method stub
}
+
+ /**
+ * We store the value of udf.import.list in
PigContext#properties.getProperty("spark.udf.import.list") in spark mode.
+ * Later we will use
PigContext#properties.getProperty("spark.udf.import.list")in
PigContext#writeObject.
+ * we don't save this value in
PigContext#properties.getProperty("udf.import.list")
+ * because this will cause OOM problem(detailed see PIG-4295).
+ * @param pigContext
+ */
+ private void saveUdfImportList(PigContext pigContext) {
+ String udfImportList =
Joiner.on(",").join(PigContext.getPackageImportList());
+ pigContext.getProperties().setProperty("spark.udf.import.list",
udfImportList);
+ }
}
Modified: pig/branches/spark/src/org/apache/pig/impl/PigContext.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigContext.java?rev=1680363&r1=1680362&r2=1680363&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/PigContext.java Tue May 19
18:15:23 2015
@@ -17,6 +17,9 @@
*/
package org.apache.pig.impl;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
@@ -26,6 +29,8 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.io.StringWriter;
import java.lang.reflect.Constructor;
@@ -63,6 +68,7 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkExecType;
import org.apache.pig.impl.streaming.ExecutableManager;
import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
@@ -908,4 +914,42 @@ public class PigContext implements Seria
classloader = new ContextClassLoader(cl);
}
}
+
+ /**
+ * In spark mode, JavaSerializer will serialize objects when the a task
starts(Detailed see PIG-4295)
+ * packageImportList is a ThreadLocal variable which can not be serialized.
+ * we overwrites writeObject method to serialize the value of
packageImportList.
+ * @param out
+ * @throws IOException
+ */
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ if(getExecType() instanceof SparkExecType) {
+ ArrayList<String> udfImportList = new ArrayList<String>();
+ if (packageImportList.get() == null) {
+ String udfImportListStr =
properties.getProperty("spark.udf.import.list");
+ if (udfImportListStr != null) {
+ udfImportList =
Lists.newArrayList(Splitter.on(",").split(udfImportListStr));
+ }
+ } else {
+ udfImportList = packageImportList.get();
+ }
+ out.writeObject(udfImportList);
+ }
+ }
+
+ /**
+ * packageImportList is ThreadLocal variable which can not be deserialized.
+ * we overwrites readObject method to deserialize the value and set it to
packageImportList.
+ * @param in
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ in.defaultReadObject();
+ if(getExecType() instanceof SparkExecType) {
+ ArrayList<String> udfImportList = (ArrayList<String>)
in.readObject();
+ packageImportList.set(udfImportList);
+ }
+ }
}