[ https://issues.apache.org/jira/browse/PIG-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15950940#comment-15950940 ]
Adam Szita commented on PIG-5207: --------------------------------- This test case uses COR UDF to calculate correlation between 3 vars. The cause of the issue is that when execution reaches the Final implementation of the COR UDF there are some junk bags in the input Tuple: {code} --input |--DefaultBag: {((943629.1899999954,19810.98000000007,476680.0,52620.35740000006,2.5723842E7)} |--DefaultBag: {((157499.53767599948,19810.98000000007,52620.35740000006,52620.35740000006,503441.4849212208)} |--InternalCachedBag: |--0 = {BinSedesTuple@7900} "(157499.53767599948,19810.98000000007,52620.35740000006,52620.35740000006,503441.4849212208)" |--1 = {Long@7901} "10000" |--2 = {BinSedesTuple@7902} "(943629.1899999954,19810.98000000007,476680.0,52620.35740000006,2.5723842E7)" |--3 = {Long@7903} "10000" |--4 = {BinSedesTuple@7904} "(2509050.8495000014,52620.35740000006,476680.0,503441.4849212208,2.5723842E7)" |--5 = {Long@7905} "10000" {code} The real result to be consumed is at position 2, but of course since we expect 1 entry here the implementation queries for {{.get(0)}} and shortly after we will get a ClassCastException. *This is because of a fault in the CombinerOptimizer of Spark, it doesn't remove all original input projections in POUserFunc.Final part* as seen [here|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java#L299]: {{PhysicalOperator udfInput = pplan.getPredecessors(combineUdf).get(0);}} //only the first input (this is why we didn't see this issue when using for example the AVG UDF) As seen in the plan: {code} Spark node scope-38 D: Store(file:/tmp/temp-1343607396/tmp944122059:org.apache.pig.impl.io.InterStorage) - scope-37 | |---D: New For Each(false,true)[tuple] - scope-47 (postReduceFE) | | | Project[chararray][0] - scope-39 | | | Project[bag][1] - scope-43 | | | |---Project[bag][1] - scope-42 | | POUserFunc(org.apache.pig.builtin.COR$Final)[bag] - scope-46 | | | |---Project[bag][0] - scope-41 << residual unneeded projection, remains from the original UDF's input | | | | | |---Project[bag][1] - scope-40 | | | |---Project[bag][2] - scope-45 << residual unneeded projection, remains from the original UDF's input | | | | | |---Project[bag][1] - scope-44 | | | |---Project[bag][1] - scope-67 << actual subresult comes in here | |---C: Reduce By(false,false)[tuple] - scope-57 (cfe) | | | Project[chararray][0] - scope-58 {code} After fixing this I saw that although now I get good results, the order of the results is off. The actual correlation values between var0-var1, etc.. are shifted with respect to MR's output. This is due to another plan generation bug and it is actually observable in the plan above, that {{POUserFunc}} has Project {{0,2,1}} as its input (instead of {{0,1,2}}). The root of this problem is the cloning of the ForEach operator [here|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java#L141]. Cloning this will trigger cloning the associated PhysicalPlan instance and unfortunately [that method|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java#L217] has a bug: It doesn't keep the order of the {{List<PhysicalOperator>}} lists in the {{mToEdges}} because it is only considering {{mFromEdges}} to connect the cloned Operator instances. Keeping the ordering cannot be achieved by looking either only at {{mToEdges}} or only at {{mFromEdges}} since both operate with lists. As a fix to this I'm sorting these lists in the cloned plan according to what was in the original plan (by the use of the {{matches}} map that maps original and cloned ops. This doesn't come up in MR mode because the ForEach op is not cloned there but rather modified in-place during combiner optimization. We could do the same in Spark too but I feel this should rather be fixed in the common code for future clone() convenience. [~kellyzly] can you review the spark parts in CombinerOptimizer.java please? [~rohini] can you please review the common parts in PhysicalPlan.java? Please find the fix in my patch: [^PIG-5207.0.patch] > BugFix e2e tests fail on spark > ------------------------------ > > Key: PIG-5207 > URL: https://issues.apache.org/jira/browse/PIG-5207 > Project: Pig > Issue Type: Sub-task > Components: spark > Reporter: Adam Szita > Assignee: Adam Szita > Fix For: spark-branch > > Attachments: PIG-5207.0.patch > > > Observed ClassCastException in BugFix 1 and 2 test cases. The exception is > thrown from and UDF: COR.Final -- This message was sent by Atlassian JIRA (v6.3.15#6346)