[ 
https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15950940#comment-15950940
 ] 

Adam Szita commented on PIG-5207:
---------------------------------

This test case uses COR UDF to calculate correlation between 3 vars.
The cause of the issue is that when execution reaches the Final implementation 
of the COR UDF there are some junk bags in the input Tuple:
{code}
--input
  |--DefaultBag: 
{((943629.1899999954,19810.98000000007,476680.0,52620.35740000006,2.5723842E7)}
  |--DefaultBag: 
{((157499.53767599948,19810.98000000007,52620.35740000006,52620.35740000006,503441.4849212208)}
  |--InternalCachedBag: 
     |--0 = {BinSedesTuple@7900} 
"(157499.53767599948,19810.98000000007,52620.35740000006,52620.35740000006,503441.4849212208)"
     |--1 = {Long@7901} "10000"
     |--2 = {BinSedesTuple@7902} 
"(943629.1899999954,19810.98000000007,476680.0,52620.35740000006,2.5723842E7)"
     |--3 = {Long@7903} "10000"
     |--4 = {BinSedesTuple@7904} 
"(2509050.8495000014,52620.35740000006,476680.0,503441.4849212208,2.5723842E7)"
     |--5 = {Long@7905} "10000"
{code}

The real result to be consumed is at position 2, but of course since we expect 
1 entry here the implementation queries for {{.get(0)}} and shortly after we 
will get a ClassCastException.
*This is because of a fault in the CombinerOptimizer of Spark, it doesn't 
remove all original input projections in POUserFunc.Final part* as seen 
[here|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java#L299]:
 {{PhysicalOperator udfInput = pplan.getPredecessors(combineUdf).get(0);}} 
//only the first input (this is why we didn't see this issue when using for 
example the AVG UDF)
As seen in the plan:
{code}
Spark node scope-38
D: 
Store(file:/tmp/temp-1343607396/tmp944122059:org.apache.pig.impl.io.InterStorage)
 - scope-37
|
|---D: New For Each(false,true)[tuple] - scope-47 (postReduceFE)
    |   |
    |   Project[chararray][0] - scope-39
    |   |
    |   Project[bag][1] - scope-43
    |   |
    |   |---Project[bag][1] - scope-42
    |   
    |   POUserFunc(org.apache.pig.builtin.COR$Final)[bag] - scope-46
    |   |
    |   |---Project[bag][0] - scope-41   << residual unneeded projection, 
remains from the original UDF's input
    |   |   |
    |   |   |---Project[bag][1] - scope-40
    |   |
    |   |---Project[bag][2] - scope-45   << residual unneeded projection, 
remains from the original UDF's input
    |   |   |
    |   |   |---Project[bag][1] - scope-44
    |   |
    |   |---Project[bag][1] - scope-67   << actual subresult comes in here
    |
    |---C: Reduce By(false,false)[tuple] - scope-57  (cfe)
        |   |
        |   Project[chararray][0] - scope-58
{code}

After fixing this I saw that although now I get good results, the order of the 
results is off. The actual correlation values between var0-var1, etc.. are 
shifted with respect to MR's output.
This is due to another plan generation bug and it is actually observable in the 
plan above, that {{POUserFunc}} has Project {{0,2,1}} as its input (instead of 
{{0,1,2}}).

The root of this problem is the cloning of the ForEach operator 
[here|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java#L141].
 Cloning this will trigger cloning the associated PhysicalPlan instance and 
unfortunately [that 
method|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java#L217]
 has a bug:

It doesn't keep the order of the {{List<PhysicalOperator>}} lists in the 
{{mToEdges}} because it is only considering {{mFromEdges}} to connect the 
cloned Operator instances.
Keeping the ordering cannot be achieved by looking either only at {{mToEdges}} 
or only at {{mFromEdges}} since both operate with lists.

As a fix to this I'm sorting these lists in the cloned plan according to what 
was in the original plan (by the use of the {{matches}} map that maps original 
and cloned ops.

This doesn't come up in MR mode because the ForEach op is not cloned there but 
rather modified in-place during combiner optimization. We could do the same in 
Spark too but I feel this should rather be fixed in the common code for future 
clone() convenience. 

[~kellyzly] can you review the spark parts in CombinerOptimizer.java please?
[~rohini] can you please review the common parts in PhysicalPlan.java?

Please find the fix in my patch: [^PIG-5207.0.patch]

> BugFix e2e tests fail on spark
> ------------------------------
>
>                 Key: PIG-5207
>                 URL: https://issues.apache.org/jira/browse/PIG-5207
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: Adam Szita
>            Assignee: Adam Szita
>             Fix For: spark-branch
>
>         Attachments: PIG-5207.0.patch
>
>
> Observed ClassCastException in BugFix 1 and 2 test cases. The exception is 
> thrown from and UDF: COR.Final



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to