Author: xuefu
Date: Wed Oct 26 15:47:56 2016
New Revision: 1766694
URL: http://svn.apache.org/viewvc?rev=1766694&view=rev
Log:
PIG-4920: Fail to use Javascript UDF in spark yarn client mode (Liyun via Xuefu)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
pig/branches/spark/src/org/apache/pig/impl/PigContext.java
pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java?rev=1766694&view=auto
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java
(added)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java
Wed Oct 26 15:47:56 2016
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+/**
+ * The object of SparkEngineConf is to solve the initialization problem of
PigContext.properties.get("udf.import.list"),
+ * UDFContext#udfConfs, UDFContext#clientSysProps in spark mode. These
variables can not be
+ * serialized because they are ThreadLocal variables. In MR mode, they are
serialized in JobConfiguration
+ * in JobControlCompiler#getJob and deserialized by JobConfiguration in
PigGenericMapBase#setup. But there is no
+ * setup() in spark like what in mr, so these variables can be correctly
deserialized before spark programs call them.
+ * Here we use following solution to solve:
+ * these variables are saved in SparkEngineConf#writeObject and available and
then initialized
+ * in SparkEngineConf#readObject in spark executor thread.
+ */
+public class SparkEngineConf implements Serializable {
+
+ private static final Log log = LogFactory.getLog(SparkEngineConf.class);
+ private static String SPARK_UDF_IMPORT_LIST= "spark.udf.import.list";
+ private static String
SPARK_UDFCONTEXT_UDFCONFS="spark.udfcontext.udfConfs";
+ private static String SPARK_UDFCONTEXT_CLIENTSYSPROPS=
"spark.udfcontext.clientSysProps";
+
+ private Properties properties = new Properties();
+
+ public SparkEngineConf() {
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ ArrayList<String> udfImportList = (ArrayList<String>) in.readObject();
+ PigContext.setPackageImportList(udfImportList);
+ String udfConfsStr = (String) in.readObject();
+ String clientSysPropsStr = (String) in.readObject();
+ UDFContext.getUDFContext().deserializeForSpark(udfConfsStr,
clientSysPropsStr);
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ ArrayList<String> udfImportList =
Lists.newArrayList(Splitter.on(",").split(properties.getProperty(SPARK_UDF_IMPORT_LIST)));
+ out.writeObject(udfImportList);
+ //2 threads call SparkEngineConf#writeObject
+ //In main thread: SparkLauncher#initialize->SparkUtil#newJobConf
+ // ->ObjectSerializer#serialize->
SparkEngineConf#writeObject
+ //In dag-scheduler-event-loop thread:
DAGScheduler.submitMissingTasks->JavaSerializationStream.writeObject
+ //
+ //In main thread,UDFContext#getUDFContext is not empty, we store
UDFContext#udfConfs and UDFContext#clientSysProps
+ //into properties and serialize them.
+ //In dag-scheduler-event-loop thread, UDFContext#getUDFContext is
empty, we get value of UDFContext#udfConfs and UDFContext#clientSysProps
+ //from properties and serialize them.
+ if (!UDFContext.getUDFContext().isUDFConfEmpty()) {
+ //In SparkUtil#newJobConf(), sparkEngineConf is serialized in job
configuration and will call
+ //SparkEngineConf#writeObject(at this time UDFContext#udfConfs and
UDFContext#clientSysProps is not null)
+ //later spark will call JavaSerializationStream.writeObject to
serialize all objects when submit spark
+ //jobs(at that time, UDFContext#udfConfs and
UDFContext#clientSysProps is null so we need to save their
+ //value in SparkEngineConf#properties after these two variables
are correctly initialized in
+ //SparkUtil#newJobConf, More detailed see PIG-4920
+ String udfConfsStr =
ObjectSerializer.serialize(UDFContext.getUDFContext().getUdfConfs());
+ String clientSysPropsStr =
ObjectSerializer.serialize(UDFContext.getUDFContext().getClientSysProps());
+ this.properties.setProperty(SPARK_UDFCONTEXT_UDFCONFS,
udfConfsStr);
+ this.properties.setProperty(SPARK_UDFCONTEXT_CLIENTSYSPROPS,
clientSysPropsStr);
+ out.writeObject(udfConfsStr);
+ out.writeObject(clientSysPropsStr);
+ } else {
+
out.writeObject(this.properties.getProperty(SPARK_UDFCONTEXT_UDFCONFS));
+
out.writeObject(this.properties.getProperty(SPARK_UDFCONTEXT_CLIENTSYSPROPS));
+ }
+ }
+
+ public void setSparkUdfImportListStr(String sparkUdfImportListStr) {
+ this.properties.setProperty(SPARK_UDF_IMPORT_LIST,
sparkUdfImportListStr);
+ }
+}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1766694&r1=1766693&r2=1766694&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Wed Oct 26 15:47:56 2016
@@ -143,6 +143,7 @@ public class SparkLauncher extends Launc
private PigContext pigContext = null;
private JobConf jobConf = null;
private String currentDirectoryPath = null;
+ private SparkEngineConf sparkEngineConf = new SparkEngineConf();
@Override
public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
@@ -181,7 +182,7 @@ public class SparkLauncher extends Launc
Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap
= new HashMap<Class<? extends PhysicalOperator>,
RDDConverter>();
convertMap.put(POLoad.class, new LoadConverter(pigContext,
- physicalPlan, sparkContext.sc(), jobConf));
+ physicalPlan, sparkContext.sc(), jobConf, sparkEngineConf));
convertMap.put(POStore.class, new StoreConverter(jobConf));
convertMap.put(POForEach.class, new ForEachConverter());
convertMap.put(POFilter.class, new FilterConverter());
@@ -633,19 +634,18 @@ public class SparkLauncher extends Launc
}
/**
- * We store the value of udf.import.list in
PigContext#properties.getProperty("spark.udf.import.list") in spark mode.
- * Later we will use
PigContext#properties.getProperty("spark.udf.import.list")in
PigContext#writeObject.
- * we don't save this value in
PigContext#properties.getProperty("udf.import.list")
- * because this will cause OOM problem(detailed see PIG-4295).
+ * We store the value of udf.import.list in SparkEngineConf#properties
+ * Later we will serialize it in SparkEngineConf#writeObject and
deserialize in SparkEngineConf#readObject. More
+ * detail see PIG-4920
*/
private void saveUdfImportList() {
String udfImportList =
Joiner.on(",").join(PigContext.getPackageImportList());
- pigContext.getProperties().setProperty("spark.udf.import.list",
udfImportList);
+ sparkEngineConf.setSparkUdfImportListStr(udfImportList);
}
private void initialize(PhysicalPlan physicalPlan) throws IOException {
saveUdfImportList();
- jobConf = SparkUtil.newJobConf(pigContext, physicalPlan);
+ jobConf = SparkUtil.newJobConf(pigContext, physicalPlan,
sparkEngineConf);
SchemaTupleBackend.initialize(jobConf, pigContext);
Utils.setDefaultTimeZone(jobConf);
PigMapReduce.sJobConfInternal.set(jobConf);
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1766694&r1=1766693&r2=1766694&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
Wed Oct 26 15:47:56 2016
@@ -75,12 +75,18 @@ public class SparkUtil {
return (ClassTag<Product2<K, V>>) (Object) getManifest(Product2.class);
}
- public static JobConf newJobConf(PigContext pigContext, PhysicalPlan
physicalPlan) throws IOException {
+ public static JobConf newJobConf(PigContext pigContext, PhysicalPlan
physicalPlan, SparkEngineConf sparkEngineConf) throws
+ IOException {
JobConf jobConf = new JobConf(
ConfigurationUtil.toConfiguration(pigContext.getProperties()));
- // Serialize the PigContext so it's available in Executor thread.
+ //Serialize the thread local variable UDFContext#udfConfs,
UDFContext#clientSysProps and PigContext#packageImportList
+ //inside SparkEngineConf separately
+
jobConf.set("spark.engine.conf",ObjectSerializer.serialize(sparkEngineConf));
+ //Serialize the PigContext so it's available in Executor thread.
jobConf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
// Serialize the thread local variable inside PigContext separately
+ // Although after PIG-4920, we store udf.import.list in SparkEngineConf
+ // but we still need store it in jobConf because it will be used in
PigOutputFormat#setupUdfEnvAndStores
jobConf.set("udf.import.list",
ObjectSerializer.serialize(PigContext.getPackageImportList()));
Random rand = new Random();
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1766694&r1=1766693&r2=1766694&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
Wed Oct 26 15:47:56 2016
@@ -27,6 +27,7 @@ import java.util.Map;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkEngineConf;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.spark.SparkCounters;
import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
@@ -69,13 +70,15 @@ public class LoadConverter implements RD
private PhysicalPlan physicalPlan;
private SparkContext sparkContext;
private JobConf jobConf;
+ private SparkEngineConf sparkEngineConf;
public LoadConverter(PigContext pigContext, PhysicalPlan physicalPlan,
- SparkContext sparkContext, JobConf jobConf) {
+ SparkContext sparkContext, JobConf jobConf, SparkEngineConf
sparkEngineConf) {
this.pigContext = pigContext;
this.physicalPlan = physicalPlan;
this.sparkContext = sparkContext;
this.jobConf = jobConf;
+ this.sparkEngineConf = sparkEngineConf;
}
@Override
@@ -106,7 +109,7 @@ public class LoadConverter implements RD
registerUdfFiles();
- ToTupleFunction ttf = new ToTupleFunction();
+ ToTupleFunction ttf = new ToTupleFunction(sparkEngineConf);
//create SparkCounter and set it for ToTupleFunction
boolean disableCounter = jobConf.getBoolean("pig.disable.counter",
false);
@@ -153,6 +156,12 @@ public class LoadConverter implements RD
private String counterName;
private SparkCounters sparkCounters;
private boolean disableCounter;
+ private SparkEngineConf sparkEngineConf;
+
+ public ToTupleFunction(SparkEngineConf sparkEngineConf){
+ this.sparkEngineConf = sparkEngineConf;
+
+ }
@Override
public Tuple apply(Tuple2<Text, Tuple> v1) {
@@ -223,7 +232,6 @@ public class LoadConverter implements RD
jobConf.set(PigInputFormat.PIG_INPUT_SIGNATURES,
ObjectSerializer.serialize(inpSignatures));
jobConf.set(PigInputFormat.PIG_INPUT_LIMITS,
ObjectSerializer.serialize(inpLimits));
-
return jobConf;
}
Modified: pig/branches/spark/src/org/apache/pig/impl/PigContext.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigContext.java?rev=1766694&r1=1766693&r2=1766694&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/PigContext.java Wed Oct 26
15:47:56 2016
@@ -66,7 +66,6 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkExecType;
import org.apache.pig.impl.streaming.ExecutableManager;
import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
@@ -912,43 +911,5 @@ public class PigContext implements Seria
} else {
classloader = new ContextClassLoader(cl);
}
- }
-
- /**
- * In spark mode, JavaSerializer will serialize objects when the a task
starts(Detailed see PIG-4295)
- * packageImportList is a ThreadLocal variable which can not be serialized.
- * we overwrites writeObject method to serialize the value of
packageImportList.
- * @param out
- * @throws IOException
- */
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.defaultWriteObject();
- if(getExecType() instanceof SparkExecType) {
- ArrayList<String> udfImportList = new ArrayList<String>();
- if (packageImportList.get() == null) {
- String udfImportListStr =
properties.getProperty("spark.udf.import.list");
- if (udfImportListStr != null) {
- udfImportList =
Lists.newArrayList(Splitter.on(",").split(udfImportListStr));
- }
- } else {
- udfImportList = packageImportList.get();
- }
- out.writeObject(udfImportList);
- }
- }
-
- /**
- * packageImportList is ThreadLocal variable which can not be deserialized.
- * we overwrites readObject method to deserialize the value and set it to
packageImportList.
- * @param in
- * @throws IOException
- * @throws ClassNotFoundException
- */
- private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
- in.defaultReadObject();
- if(getExecType() instanceof SparkExecType) {
- ArrayList<String> udfImportList = (ArrayList<String>)
in.readObject();
- packageImportList.set(udfImportList);
- }
}
}
Modified: pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java?rev=1766694&r1=1766693&r2=1766694&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java Wed Oct 26
15:47:56 2016
@@ -76,7 +76,7 @@ public class UDFContext {
/*
* internal pig use only - should NOT be called from user code
*/
- HashMap<UDFContextKey, Properties> getUdfConfs() {
+ public HashMap<UDFContextKey, Properties> getUdfConfs() {
return udfConfs;
}
@@ -218,6 +218,14 @@ public class UDFContext {
jconf.get(CLIENT_SYS_PROPS));
}
+ public void deserializeForSpark(String udfConfsStr, String
clientSysPropsStr) throws IOException {
+ if( udfConfsStr!= null && clientSysPropsStr!=null) {
+ udfConfs = (HashMap<UDFContextKey, Properties>)
ObjectSerializer.deserialize(udfConfsStr);
+ clientSysProps = (Properties) ObjectSerializer.deserialize(
+ clientSysPropsStr);
+ }
+ }
+
private UDFContextKey generateKey(Class<?> c, String[] args) {
return new UDFContextKey(c.getName(), args);
}
@@ -314,4 +322,8 @@ public class UDFContext {
}
}
+ public Properties getClientSysProps() {
+ return clientSysProps;
+ }
+
}
\ No newline at end of file