[
https://issues.apache.org/jira/browse/PIG-4232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14176692#comment-14176692
]
liyunzhang_intel commented on PIG-4232:
---------------------------------------
Scripting.pig
register '/home/zly/prj/oss/pig/bin/libexec/python/scriptingudf.py' using
jython as myfuncs;
a = load '/user/pig/tests/data/singlefile/studenttab10k' using PigStorage() as
(name, age:int, gpa:double);
b = foreach a generate age;
explain b;
store b into '/user/pig/out/root-1412926432-nightly.conf/Scripting_1.out';
*Scripting.pig successes in spark mode*
Scripting.udf.pig
register '/home/zly/prj/oss/pig/bin/libexec/python/scriptingudf.py' using
jython as myfuncs;
a = load '/user/pig/tests/data/singlefile/studenttab10k' using PigStorage() as
(name, age:int, gpa:double);
b = foreach a generate myfuncs.square(age);
explain b;
store b into '/user/pig/out/root-1412926432-nightly.conf/Scripting_1.out';
*this script fails in spark mode*
After debug, I found that UDFContext is not initialized in spark executors.
In
https://github.com/apache/pig/blob/spark/src/org/apache/pig/builtin/PigStorage.java#L246-L247
{code}
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
mRequiredColumns =
(boolean[])ObjectSerializer.deserialize(p.getProperty(signature));
{code}
When executing Scripting.pig,
UDFContext.getUDFContext().getUDFProperties(this.getClass()) returns a property
which contains info about PigStorage. After deserialization, variable
"mRequiredColumns" is correctly initialized.
Wehn executing Scripting.udf.pig,
UDFContext.getUDFContext().getUDFProperties(this.getClass()) returns a property
which does not contain info about PigStorage. After deserialization, variable
"mRequiredColumns" is null.
*Where to setUDFContext?*
{code}
LoadConverter#convert
-> SparkUtil#newJobConf
public static JobConf newJobConf(PigContext pigContext) throws IOException {
JobConf jobConf = new JobConf(
ConfigurationUtil.toConfiguration(pigContext.getProperties()));
jobConf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
UDFContext.getUDFContext().serialize(jobConf);//serialize all udf
info(include PigStorage info) to jobConf
jobConf.set("udf.import.list",
ObjectSerializer.serialize(PigContext.getPackageImportList()));
return jobConf;
}
PigInputFormat#passLoadSignature
->MapRedUtil.setupUDFContext(conf);
->UDFContext.setUDFContext
public static void setupUDFContext(Configuration job) throws IOException {
UDFContext udfc = UDFContext.getUDFContext();
udfc.addJobConf(job);
// don't deserialize in front-end
if (udfc.isUDFConfEmpty()) {
udfc.deserialize(); //UDFContext deserializes from jobConf.
}
}
{code}
In LoadConverter#convert, first serializes all udf info (include PigStorage) to
jobConf. Then in PigInputFormat#passLoadSignature, UDFContext deserializes from
jobConf.
*Why need serialization and deserialization?*
UDFContext#tss is a ThreadLocal variable, the value is different in different
threads. LoadConverter#convert is executed in Thread-A and
PigInputFormat#passLoadSignature is executed in Thread-B because spark send its
task to executors to be executed. Actually PigInputFormat#passLoadSignature is
executed in spark-executor(Thread-B).Serialization and deserialization is used
to initialize the UDFContext’s value in different threads.
{code}
UDFContext.java
private static ThreadLocal<UDFContext> tss = new ThreadLocal<UDFContext>() {
@Override
public UDFContext initialValue() {
return new UDFContext();
}
};
public static UDFContext getUDFContext() {
UDFContext res= tss.get();
return res;
}
{code}
*Why has difference between situation with udf and without udf?*
With udf:
Before PigInputFormat#passLoadSignature is executed, POUserFunc#
setFuncInputSchema is executed. In POUserFunc#setFuncInputSchema,
UDFContext#udfConfs is put an entry(the key is POUserFunc, the value is an
empty property ). When PigInputFormat#passLoadSignature is executed, the
condition to deserialize UDFContext( udfc.isUDFConfEmpty) can not be matched.
Deserialization is not executed.
{code}
PoUserFunc#readObject
->POUserFunc#instantiateFunc(FuncSpec)
->POUserFunc#setFuncInputSchema(String)
public void setFuncInputSchema(String signature) {
Properties props =
UDFContext.getUDFContext().getUDFProperties(func.getClass());
Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);
if(tmpS!=null) {
this.func.setInputSchema(tmpS);
}
}
->UDFContext.java
public Properties getUDFProperties(Class c) {
UDFContextKey k = generateKey(c, null);
Properties p = udfConfs.get(k);
if (p == null) {
p = new Properties(); // an empty property
udfConfs.put(k, p); //UDFContext#udfConfs is put an entry with
PoUserFunc info
}
return p;
}
{code}
{code}
PigInputFormat#passLoadSignature
->MapRedUtil.setupUDFContext(conf);
->UDFContext.setUDFContext
public static void setupUDFContext(Configuration job) throws IOException {
UDFContext udfc = UDFContext.getUDFContext();
udfc.addJobConf(job);
// don't deserialize in front-end
if (udfc.isUDFConfEmpty()) { // the condition to deserialize UDFContext
udfc.deserialize(); //UDFContext deserializes from jobConf.
}
}
{code}
*My fix*
Now my patch is following: When all properties in udfConfs in UDFContext are
empty , isUDFConfEmpty also returns true;
{code}
public boolean isUDFConfEmpty() {
- return udfConfs.isEmpty();
+ // return udfConfs.isEmpty();
+ if( udfConfs.isEmpty()){
+ return true;
+ }else{
+ boolean res = true;
+ for(UDFContextKey udfContextKey:udfConfs.keySet()){
+ if(!udfConfs.get(udfContextKey).isEmpty()){
+ res = false;
+ break;
+ }
+ }
+ return res;
+ }
+ }
{code}
*My question :Is there any better way to fix this jira because my patch
modifies UDFContext.java( a java file not in package
org.apache.pig.backend.hadoop.executionengine.spark)?*
> UDFContext is not initialized in executors when running on Spark cluster
> ------------------------------------------------------------------------
>
> Key: PIG-4232
> URL: https://issues.apache.org/jira/browse/PIG-4232
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: Praveen Rachabattuni
> Assignee: liyunzhang_intel
>
> UDFContext is used in lot of features across pig code base. For example its
> used in PigStorage to pass columns information between the frontend and the
> backend code.
> https://github.com/apache/pig/blob/spark/src/org/apache/pig/builtin/PigStorage.java#L246-L247
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)