[
https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15950940#comment-15950940
]
Adam Szita commented on PIG-5207:
---------------------------------
This test case uses COR UDF to calculate correlation between 3 vars.
The cause of the issue is that when execution reaches the Final implementation
of the COR UDF there are some junk bags in the input Tuple:
{code}
--input
|--DefaultBag:
{((943629.1899999954,19810.98000000007,476680.0,52620.35740000006,2.5723842E7)}
|--DefaultBag:
{((157499.53767599948,19810.98000000007,52620.35740000006,52620.35740000006,503441.4849212208)}
|--InternalCachedBag:
|--0 = {BinSedesTuple@7900}
"(157499.53767599948,19810.98000000007,52620.35740000006,52620.35740000006,503441.4849212208)"
|--1 = {Long@7901} "10000"
|--2 = {BinSedesTuple@7902}
"(943629.1899999954,19810.98000000007,476680.0,52620.35740000006,2.5723842E7)"
|--3 = {Long@7903} "10000"
|--4 = {BinSedesTuple@7904}
"(2509050.8495000014,52620.35740000006,476680.0,503441.4849212208,2.5723842E7)"
|--5 = {Long@7905} "10000"
{code}
The real result to be consumed is at position 2, but of course since we expect
1 entry here the implementation queries for {{.get(0)}} and shortly after we
will get a ClassCastException.
*This is because of a fault in the CombinerOptimizer of Spark, it doesn't
remove all original input projections in POUserFunc.Final part* as seen
[here|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java#L299]:
{{PhysicalOperator udfInput = pplan.getPredecessors(combineUdf).get(0);}}
//only the first input (this is why we didn't see this issue when using for
example the AVG UDF)
As seen in the plan:
{code}
Spark node scope-38
D:
Store(file:/tmp/temp-1343607396/tmp944122059:org.apache.pig.impl.io.InterStorage)
- scope-37
|
|---D: New For Each(false,true)[tuple] - scope-47 (postReduceFE)
| |
| Project[chararray][0] - scope-39
| |
| Project[bag][1] - scope-43
| |
| |---Project[bag][1] - scope-42
|
| POUserFunc(org.apache.pig.builtin.COR$Final)[bag] - scope-46
| |
| |---Project[bag][0] - scope-41 << residual unneeded projection,
remains from the original UDF's input
| | |
| | |---Project[bag][1] - scope-40
| |
| |---Project[bag][2] - scope-45 << residual unneeded projection,
remains from the original UDF's input
| | |
| | |---Project[bag][1] - scope-44
| |
| |---Project[bag][1] - scope-67 << actual subresult comes in here
|
|---C: Reduce By(false,false)[tuple] - scope-57 (cfe)
| |
| Project[chararray][0] - scope-58
{code}
After fixing this I saw that although now I get good results, the order of the
results is off. The actual correlation values between var0-var1, etc.. are
shifted with respect to MR's output.
This is due to another plan generation bug and it is actually observable in the
plan above, that {{POUserFunc}} has Project {{0,2,1}} as its input (instead of
{{0,1,2}}).
The root of this problem is the cloning of the ForEach operator
[here|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java#L141].
Cloning this will trigger cloning the associated PhysicalPlan instance and
unfortunately [that
method|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java#L217]
has a bug:
It doesn't keep the order of the {{List<PhysicalOperator>}} lists in the
{{mToEdges}} because it is only considering {{mFromEdges}} to connect the
cloned Operator instances.
Keeping the ordering cannot be achieved by looking either only at {{mToEdges}}
or only at {{mFromEdges}} since both operate with lists.
As a fix to this I'm sorting these lists in the cloned plan according to what
was in the original plan (by the use of the {{matches}} map that maps original
and cloned ops.
This doesn't come up in MR mode because the ForEach op is not cloned there but
rather modified in-place during combiner optimization. We could do the same in
Spark too but I feel this should rather be fixed in the common code for future
clone() convenience.
[~kellyzly] can you review the spark parts in CombinerOptimizer.java please?
[~rohini] can you please review the common parts in PhysicalPlan.java?
Please find the fix in my patch: [^PIG-5207.0.patch]
> BugFix e2e tests fail on spark
> ------------------------------
>
> Key: PIG-5207
> URL: https://issues.apache.org/jira/browse/PIG-5207
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: Adam Szita
> Assignee: Adam Szita
> Fix For: spark-branch
>
> Attachments: PIG-5207.0.patch
>
>
> Observed ClassCastException in BugFix 1 and 2 test cases. The exception is
> thrown from and UDF: COR.Final
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)