[ 
https://issues.apache.org/jira/browse/PIG-4295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

liyunzhang_intel updated PIG-4295:
----------------------------------
    Attachment: PIG-4295_3.patch

[~mohitsabharwal]:
 Thanks for your review. For your suggestions, following are my comment:
{quote}
(1) In SparkLauncher, when saving the udf import list, I think it's safer to 
include the {{udf.import.list}} property, if it's already populated, i.e. 
something like
{code}
+    private void saveUdfImportList(PigContext pigContext) {
+        String propertyVal = pigContext.getProperties().get("udf.import.list");
+        String udfImportList = 
Joiner.on(",").join(PigContext.getPackageImportList());
+        if (propertyVal != null) {
+            udfImportList = udfImportList + "," + propertyVal;
+        }
+        pigContext.getProperties().setProperty("spark.udf.import.list", 
udfImportList);
+    }
{code}
{quote}

we need not include the {{udf.import.list}} property in 
SparkLauncher#saveUdfImportList because before SparkLauncher#saveUdfImportList, 
{{udf.import.list}} will be stored in PigContext#packageImportList in 
PigContext#init
{code}
  private void init() {
        if (properties.get("udf.import.list")!=null)
               
PigContext.initializeImportList((String)properties.get("udf.import.list"));
    }
{code}


{quote}
(2) It's also safer I think, in both writeObject and readObject to include the 
Spark specific property only conditioned on Spark engine.

{code}
if (Util.isSparkExecType(getExecutionEngine()) {
+        if (packageImportList.get() == null) {
+            String udfImportListStr = 
properties.getProperty("spark.udf.import.list");
+            if (udfImportListStr != null) {
+                udfImportList = 
Lists.newArrayList(Splitter.on(",").split(udfImportListStr));
+            }
+        } else {
+            udfImportList = packageImportList.get();
+        }
+        out.writeObject(udfImportList);
}

if (Util.isSparkExecType(getExecutionEngine()) {
+        ArrayList<String> udfImportList = (ArrayList<String>) in.readObject();
+        packageImportList.set(udfImportList);
}
{code}

This way there is no accidental risk of overwriting {{packageImportList}} for 
MR/Tez. There is also no reason to serialize the udf import list explicitly for 
MR and Tez engines.

{quote}

I have changed the code according to your comment.

{quote}
(3) Could you also please add few comments on SparkLauncher#saveUdfImportList 
for the reason behind the new property {{spark.udf.import.list}}. Thanks!
{quote}
I have changed the code according to your comment.

> Enable unit test "TestPigContext" for spark
> -------------------------------------------
>
>                 Key: PIG-4295
>                 URL: https://issues.apache.org/jira/browse/PIG-4295
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>    Affects Versions: spark-branch
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4295.patch, PIG-4295_1.patch, PIG-4295_2.patch, 
> PIG-4295_3.patch, TEST-org.apache.pig.test.TestPigContext.txt
>
>
> error log is attached



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to