Author: xuefu
Date: Wed Oct 26 15:47:56 2016
New Revision: 1766694

URL: http://svn.apache.org/viewvc?rev=1766694&view=rev
Log:
PIG-4920: Fail to use Javascript UDF in spark yarn client mode (Liyun via Xuefu)

Added:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java
Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
    pig/branches/spark/src/org/apache/pig/impl/PigContext.java
    pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java

Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java?rev=1766694&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java
 Wed Oct 26 15:47:56 2016
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+/**
+ * The object of SparkEngineConf is to solve the initialization problem of 
PigContext.properties.get("udf.import.list"),
+ * UDFContext#udfConfs, UDFContext#clientSysProps in spark mode. These 
variables can not be
+ * serialized because they are ThreadLocal variables. In MR mode, they are 
serialized in JobConfiguration
+ * in JobControlCompiler#getJob and deserialized by JobConfiguration in 
PigGenericMapBase#setup. But there is no
+ * setup() in spark like what in mr, so these variables can be correctly 
deserialized before spark programs call them.
+ * Here we use following solution to solve:
+ * these variables are saved in SparkEngineConf#writeObject and available and 
then initialized
+ * in SparkEngineConf#readObject in spark executor thread.
+ */
+public class SparkEngineConf implements Serializable {
+
+    private static final Log log = LogFactory.getLog(SparkEngineConf.class);
+    private static String SPARK_UDF_IMPORT_LIST= "spark.udf.import.list";
+    private static String 
SPARK_UDFCONTEXT_UDFCONFS="spark.udfcontext.udfConfs";
+    private static String SPARK_UDFCONTEXT_CLIENTSYSPROPS= 
"spark.udfcontext.clientSysProps";
+
+    private Properties properties = new Properties();
+
+    public SparkEngineConf() {
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+        ArrayList<String> udfImportList = (ArrayList<String>) in.readObject();
+        PigContext.setPackageImportList(udfImportList);
+        String udfConfsStr = (String) in.readObject();
+        String clientSysPropsStr = (String) in.readObject();
+        UDFContext.getUDFContext().deserializeForSpark(udfConfsStr, 
clientSysPropsStr);
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        ArrayList<String> udfImportList = 
Lists.newArrayList(Splitter.on(",").split(properties.getProperty(SPARK_UDF_IMPORT_LIST)));
+        out.writeObject(udfImportList);
+        //2 threads call SparkEngineConf#writeObject
+        //In main thread: SparkLauncher#initialize->SparkUtil#newJobConf
+        //                ->ObjectSerializer#serialize-> 
SparkEngineConf#writeObject
+        //In dag-scheduler-event-loop thread: 
DAGScheduler.submitMissingTasks->JavaSerializationStream.writeObject
+        //
+        //In main thread,UDFContext#getUDFContext is not empty, we store 
UDFContext#udfConfs and UDFContext#clientSysProps
+        //into properties and serialize them.
+        //In dag-scheduler-event-loop thread, UDFContext#getUDFContext is 
empty, we get value of UDFContext#udfConfs and UDFContext#clientSysProps
+        //from properties and serialize them.
+        if (!UDFContext.getUDFContext().isUDFConfEmpty()) {
+            //In SparkUtil#newJobConf(), sparkEngineConf is serialized in job 
configuration and will call
+            //SparkEngineConf#writeObject(at this time UDFContext#udfConfs and 
UDFContext#clientSysProps is not null)
+            //later spark will call JavaSerializationStream.writeObject to 
serialize all objects when submit spark
+            //jobs(at that time, UDFContext#udfConfs and 
UDFContext#clientSysProps is null so we need to save their
+            //value in SparkEngineConf#properties after these two variables 
are correctly initialized in
+            //SparkUtil#newJobConf, More detailed see PIG-4920
+            String udfConfsStr = 
ObjectSerializer.serialize(UDFContext.getUDFContext().getUdfConfs());
+            String clientSysPropsStr = 
ObjectSerializer.serialize(UDFContext.getUDFContext().getClientSysProps());
+            this.properties.setProperty(SPARK_UDFCONTEXT_UDFCONFS, 
udfConfsStr);
+            this.properties.setProperty(SPARK_UDFCONTEXT_CLIENTSYSPROPS, 
clientSysPropsStr);
+            out.writeObject(udfConfsStr);
+            out.writeObject(clientSysPropsStr);
+        } else {
+            
out.writeObject(this.properties.getProperty(SPARK_UDFCONTEXT_UDFCONFS));
+            
out.writeObject(this.properties.getProperty(SPARK_UDFCONTEXT_CLIENTSYSPROPS));
+        }
+    }
+
+    public void setSparkUdfImportListStr(String sparkUdfImportListStr) {
+        this.properties.setProperty(SPARK_UDF_IMPORT_LIST, 
sparkUdfImportListStr);
+    }
+}

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1766694&r1=1766693&r2=1766694&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Wed Oct 26 15:47:56 2016
@@ -143,6 +143,7 @@ public class SparkLauncher extends Launc
     private PigContext pigContext = null;
     private JobConf jobConf = null;
     private String currentDirectoryPath = null;
+    private SparkEngineConf sparkEngineConf = new SparkEngineConf();
 
     @Override
     public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
@@ -181,7 +182,7 @@ public class SparkLauncher extends Launc
         Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap
                 = new HashMap<Class<? extends PhysicalOperator>, 
RDDConverter>();
         convertMap.put(POLoad.class, new LoadConverter(pigContext,
-                physicalPlan, sparkContext.sc(), jobConf));
+                physicalPlan, sparkContext.sc(), jobConf, sparkEngineConf));
         convertMap.put(POStore.class, new StoreConverter(jobConf));
         convertMap.put(POForEach.class, new ForEachConverter());
         convertMap.put(POFilter.class, new FilterConverter());
@@ -633,19 +634,18 @@ public class SparkLauncher extends Launc
     }
 
     /**
-     * We store the value of udf.import.list in 
PigContext#properties.getProperty("spark.udf.import.list") in spark mode.
-     * Later we will use 
PigContext#properties.getProperty("spark.udf.import.list")in 
PigContext#writeObject.
-     * we don't save this value in 
PigContext#properties.getProperty("udf.import.list")
-     * because this will cause OOM problem(detailed see PIG-4295).
+     * We store the value of udf.import.list in SparkEngineConf#properties
+     * Later we will serialize it in SparkEngineConf#writeObject and 
deserialize in SparkEngineConf#readObject. More
+     * detail see PIG-4920
      */
     private void saveUdfImportList() {
         String udfImportList = 
Joiner.on(",").join(PigContext.getPackageImportList());
-        pigContext.getProperties().setProperty("spark.udf.import.list", 
udfImportList);
+        sparkEngineConf.setSparkUdfImportListStr(udfImportList);
     }
 
     private void initialize(PhysicalPlan physicalPlan) throws IOException {
         saveUdfImportList();
-        jobConf = SparkUtil.newJobConf(pigContext, physicalPlan);
+        jobConf = SparkUtil.newJobConf(pigContext, physicalPlan, 
sparkEngineConf);
         SchemaTupleBackend.initialize(jobConf, pigContext);
         Utils.setDefaultTimeZone(jobConf);
         PigMapReduce.sJobConfInternal.set(jobConf);

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1766694&r1=1766693&r2=1766694&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
 Wed Oct 26 15:47:56 2016
@@ -75,12 +75,18 @@ public class SparkUtil {
         return (ClassTag<Product2<K, V>>) (Object) getManifest(Product2.class);
     }
 
-    public static JobConf newJobConf(PigContext pigContext, PhysicalPlan 
physicalPlan) throws IOException {
+    public static JobConf newJobConf(PigContext pigContext, PhysicalPlan 
physicalPlan, SparkEngineConf sparkEngineConf) throws
+            IOException {
         JobConf jobConf = new JobConf(
                 ConfigurationUtil.toConfiguration(pigContext.getProperties()));
-        // Serialize the PigContext so it's available in Executor thread.
+        //Serialize the thread local variable UDFContext#udfConfs, 
UDFContext#clientSysProps and PigContext#packageImportList
+        //inside SparkEngineConf separately
+        
jobConf.set("spark.engine.conf",ObjectSerializer.serialize(sparkEngineConf));
+        //Serialize the PigContext so it's available in Executor thread.
         jobConf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
         // Serialize the thread local variable inside PigContext separately
+        // Although after PIG-4920, we store udf.import.list in SparkEngineConf
+        // but we still need store it in jobConf because it will be used in 
PigOutputFormat#setupUdfEnvAndStores
         jobConf.set("udf.import.list",
                 ObjectSerializer.serialize(PigContext.getPackageImportList()));
         Random rand = new Random();

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1766694&r1=1766693&r2=1766694&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
 Wed Oct 26 15:47:56 2016
@@ -27,6 +27,7 @@ import java.util.Map;
 
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkEngineConf;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.spark.SparkCounters;
 import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
@@ -69,13 +70,15 @@ public class LoadConverter implements RD
     private PhysicalPlan physicalPlan;
     private SparkContext sparkContext;
     private JobConf jobConf;
+    private SparkEngineConf sparkEngineConf;
 
     public LoadConverter(PigContext pigContext, PhysicalPlan physicalPlan,
-            SparkContext sparkContext, JobConf jobConf) {
+            SparkContext sparkContext, JobConf jobConf, SparkEngineConf 
sparkEngineConf) {
         this.pigContext = pigContext;
         this.physicalPlan = physicalPlan;
         this.sparkContext = sparkContext;
         this.jobConf = jobConf;
+        this.sparkEngineConf = sparkEngineConf;
     }
 
     @Override
@@ -106,7 +109,7 @@ public class LoadConverter implements RD
 
         registerUdfFiles();
 
-        ToTupleFunction ttf = new ToTupleFunction();
+        ToTupleFunction ttf = new ToTupleFunction(sparkEngineConf);
 
         //create SparkCounter and set it for ToTupleFunction
         boolean disableCounter = jobConf.getBoolean("pig.disable.counter", 
false);
@@ -153,6 +156,12 @@ public class LoadConverter implements RD
         private String counterName;
         private SparkCounters sparkCounters;
         private boolean disableCounter;
+        private SparkEngineConf sparkEngineConf;
+
+        public ToTupleFunction(SparkEngineConf sparkEngineConf){
+               this.sparkEngineConf = sparkEngineConf;
+
+        }
 
         @Override
         public Tuple apply(Tuple2<Text, Tuple> v1) {
@@ -223,7 +232,6 @@ public class LoadConverter implements RD
         jobConf.set(PigInputFormat.PIG_INPUT_SIGNATURES,
                 ObjectSerializer.serialize(inpSignatures));
         jobConf.set(PigInputFormat.PIG_INPUT_LIMITS, 
ObjectSerializer.serialize(inpLimits));
-
         return jobConf;
     }
 

Modified: pig/branches/spark/src/org/apache/pig/impl/PigContext.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigContext.java?rev=1766694&r1=1766693&r2=1766694&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/PigContext.java Wed Oct 26 
15:47:56 2016
@@ -66,7 +66,6 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkExecType;
 import org.apache.pig.impl.streaming.ExecutableManager;
 import org.apache.pig.impl.streaming.StreamingCommand;
 import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
@@ -912,43 +911,5 @@ public class PigContext implements Seria
         } else {
             classloader = new ContextClassLoader(cl);
         }
-    }
-
-    /**
-     * In spark mode, JavaSerializer will serialize objects when the a task 
starts(Detailed see PIG-4295)
-     * packageImportList is a ThreadLocal variable which can not be serialized.
-     * we overwrites writeObject method to serialize the value of 
packageImportList.
-     * @param out
-     * @throws IOException
-     */
-    private void writeObject(ObjectOutputStream out) throws IOException {
-        out.defaultWriteObject();
-        if(getExecType() instanceof SparkExecType) {
-            ArrayList<String> udfImportList = new ArrayList<String>();
-            if (packageImportList.get() == null) {
-                String udfImportListStr = 
properties.getProperty("spark.udf.import.list");
-                if (udfImportListStr != null) {
-                    udfImportList = 
Lists.newArrayList(Splitter.on(",").split(udfImportListStr));
-                }
-            } else {
-                udfImportList = packageImportList.get();
-            }
-            out.writeObject(udfImportList);
-        }
-    }
-
-    /**
-     * packageImportList is ThreadLocal variable which can not be deserialized.
-     * we overwrites readObject method to deserialize the value and set it to 
packageImportList.
-     * @param in
-     * @throws IOException
-     * @throws ClassNotFoundException
-     */
-    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-        in.defaultReadObject();
-        if(getExecType() instanceof SparkExecType) {
-            ArrayList<String> udfImportList = (ArrayList<String>) 
in.readObject();
-            packageImportList.set(udfImportList);
-        }
    }
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java?rev=1766694&r1=1766693&r2=1766694&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java Wed Oct 26 
15:47:56 2016
@@ -76,7 +76,7 @@ public class UDFContext {
     /*
      *  internal pig use only - should NOT be called from user code
      */
-    HashMap<UDFContextKey, Properties> getUdfConfs() {
+    public HashMap<UDFContextKey, Properties> getUdfConfs() {
         return udfConfs;
     }
 
@@ -218,6 +218,14 @@ public class UDFContext {
                 jconf.get(CLIENT_SYS_PROPS));
     }
 
+    public void deserializeForSpark(String udfConfsStr, String 
clientSysPropsStr) throws IOException {
+        if( udfConfsStr!= null && clientSysPropsStr!=null) {
+            udfConfs = (HashMap<UDFContextKey, Properties>) 
ObjectSerializer.deserialize(udfConfsStr);
+            clientSysProps = (Properties) ObjectSerializer.deserialize(
+                    clientSysPropsStr);
+        }
+    }
+
     private UDFContextKey generateKey(Class<?> c, String[] args) {
         return new UDFContextKey(c.getName(), args);
     }
@@ -314,4 +322,8 @@ public class UDFContext {
         }
     }
 
+    public Properties getClientSysProps() {
+        return clientSysProps;
+    }
+
 }
\ No newline at end of file


Reply via email to