Author: xuefu
Date: Tue May 19 18:15:23 2015
New Revision: 1680363

URL: http://svn.apache.org/r1680363
Log:
PIG-4295: Enable unit test TestPigContext for spark (Liyun via Xuefu)

Modified:
    pig/branches/spark/build.xml
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/impl/PigContext.java

Modified: pig/branches/spark/build.xml
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/build.xml?rev=1680363&r1=1680362&r2=1680363&view=diff
==============================================================================
--- pig/branches/spark/build.xml (original)
+++ pig/branches/spark/build.xml Tue May 19 18:15:23 2015
@@ -120,7 +120,7 @@
     <!-- test configuration, use ${user.home}/build.properties to configure 
values  -->
     <property name="ssh.gateway" value="" />
     <property name="hod.server" value="" />
-    <property name="test.output" value="yes"/>
+    <property name="test.output" value="no"/>
 
     <!-- e2e test properties -->
     <property name="test.e2e.dir" value="${basedir}/test/e2e/pig"/>

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1680363&r1=1680362&r2=1680363&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Tue May 19 18:15:23 2015
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.spark;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 
 import java.io.File;
@@ -131,6 +132,7 @@ public class SparkLauncher extends Launc
                        PigContext pigContext) throws Exception {
                if (LOG.isDebugEnabled())
                    LOG.debug(physicalPlan);
+        saveUdfImportList(pigContext);
                JobConf jobConf = SparkUtil.newJobConf(pigContext);
                jobConf.set(PigConstants.LOCAL_CODE_DIR,
                                System.getProperty("java.io.tmpdir"));
@@ -638,4 +640,16 @@ public class SparkLauncher extends Launc
                // TODO Auto-generated method stub
 
        }
+
+    /**
+     * We store the value of udf.import.list in 
PigContext#properties.getProperty("spark.udf.import.list") in spark mode.
+     * Later we will use 
PigContext#properties.getProperty("spark.udf.import.list")in 
PigContext#writeObject.
+     * we don't save this value in 
PigContext#properties.getProperty("udf.import.list")
+     * because this will cause OOM problem(detailed see PIG-4295).
+     * @param pigContext
+     */
+    private void saveUdfImportList(PigContext pigContext) {
+        String udfImportList = 
Joiner.on(",").join(PigContext.getPackageImportList());
+        pigContext.getProperties().setProperty("spark.udf.import.list", 
udfImportList);
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/PigContext.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigContext.java?rev=1680363&r1=1680362&r2=1680363&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/PigContext.java Tue May 19 
18:15:23 2015
@@ -17,6 +17,9 @@
  */
 package org.apache.pig.impl;
 
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
@@ -26,6 +29,8 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.io.StringWriter;
 import java.lang.reflect.Constructor;
@@ -63,6 +68,7 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkExecType;
 import org.apache.pig.impl.streaming.ExecutableManager;
 import org.apache.pig.impl.streaming.StreamingCommand;
 import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
@@ -908,4 +914,42 @@ public class PigContext implements Seria
             classloader = new ContextClassLoader(cl);
         }
     }
+
+    /**
+     * In spark mode, JavaSerializer will serialize objects when the a task 
starts(Detailed see PIG-4295)
+     * packageImportList is a ThreadLocal variable which can not be serialized.
+     * we overwrites writeObject method to serialize the value of 
packageImportList.
+     * @param out
+     * @throws IOException
+     */
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        if(getExecType() instanceof SparkExecType) {
+            ArrayList<String> udfImportList = new ArrayList<String>();
+            if (packageImportList.get() == null) {
+                String udfImportListStr = 
properties.getProperty("spark.udf.import.list");
+                if (udfImportListStr != null) {
+                    udfImportList = 
Lists.newArrayList(Splitter.on(",").split(udfImportListStr));
+                }
+            } else {
+                udfImportList = packageImportList.get();
+            }
+            out.writeObject(udfImportList);
+        }
+    }
+
+    /**
+     * packageImportList is ThreadLocal variable which can not be deserialized.
+     * we overwrites readObject method to deserialize the value and set it to 
packageImportList.
+     * @param in
+     * @throws IOException
+     * @throws ClassNotFoundException
+     */
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+        in.defaultReadObject();
+        if(getExecType() instanceof SparkExecType) {
+            ArrayList<String> udfImportList = (ArrayList<String>) 
in.readObject();
+            packageImportList.set(udfImportList);
+        }
+   }
 }


Reply via email to