[ 
https://issues.apache.org/jira/browse/PIG-3212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13588191#comment-13588191
 ] 

Kai Londenberg commented on PIG-3212:
-------------------------------------

A new Comparator for every bag would mean you need a new deep clone of the sort 
plan of POSort. This means a lot of small object fodder for the garbage 
collector and probably increase the running time.

When it comes to spilling an internal sorted bag: Do you really want such an 
operation to happen *while you are iterating* over the bag contents ? If you 
allow that, there are not just synchronization issues. The state of the 
iterator (pointer to the current element) needs to adapt accordingly. 

And what would you gain ? The memory required for the bag won't increase while 
the iterator is running (since it's closed for modifications), so it can't 
cause out of memory exceptions itself. After the iterator is finished, the bag 
in it's entirety is usually fodder for the garbage collector (at least in the 
context of POSort, this is always the case). If you allow the SMM to call spill 
on the bag while the iterator is running, all you'll gain is the possibility 
that some memory (that's actually actively being used) gets freed a little bit 
earlier than it would usually be. 

It's better to let the SMM do it's work only while the InternalBag is being 
filled. That's exactly the point when it is likely that the SMM kicks in, since 
the memory usage is increasing. The fix allows for that.
                
> Race Conditions in POSort and (Internal)SortedBag during Proactive Spill.
> -------------------------------------------------------------------------
>
>                 Key: PIG-3212
>                 URL: https://issues.apache.org/jira/browse/PIG-3212
>             Project: Pig
>          Issue Type: Bug
>    Affects Versions: 0.11
>            Reporter: Kai Londenberg
>            Priority: Critical
>             Fix For: 0.12, 0.11.1
>
>         Attachments: PIG-3212-p1.patch
>
>
> The following bug exists in the latest release of Pig 0.11.0
> While running some large jobs involving groups and sorts like these:
> {code}
> events_by_user = GROUP events BY user_id;
> sorted_events_by_user = FOREACH events_by_user {
>       A = ORDER events BY ts, split_idx, line_num;
>       GENERATE group, A;
> }
> {code}
> I got a pretty strange behaviour: While this worked on small datasets, if I 
> ran it on large datasets, the results were sometimes not sorted perfectly. 
> So after a long debugging session, I tracked it down to at least one race 
> condition:
> The following partial stack trace shows how a proactive spill gets triggered 
> on an InternalSortedBag. A spill in turn triggers a sort of that 
> InternalSortedBag.
> {code}
>       at 
> org.apache.pig.data.SortedSpillBag.proactive_spill(SortedSpillBag.java:83)
>       at 
> org.apache.pig.data.InternalSortedBag.spill(InternalSortedBag.java:455)
>       at 
> org.apache.pig.impl.util.SpillableMemoryManager.handleNotification(SpillableMemoryManager.java:243)
>       at 
> sun.management.NotificationEmitterSupport.sendNotification(NotificationEmitterSupport.java:138)
>       at sun.management.MemoryImpl.createNotification(MemoryImpl.java:171)
>       at 
> sun.management.MemoryPoolImpl$PoolSensor.triggerAction(MemoryPoolImpl.java:272)
>       at sun.management.Sensor.trigger(Sensor.java:120)
> {code}
> At the same time, the same InternalSortedBag might be sorted or accessed 
> within a POSort Operation. For example using the following Code path (line 
> numbers might be off, I had to add debug statements to diagnose this)
> {code}
> at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort.getNext(POSort.java:346)
>       at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:492)
>       at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:582)
>       at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject.getNext(PORelationToExprProject.java:107)
>       at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:394)
>       at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:372)
>       at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:297)
>       at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:368)
>       at 
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit.getNext(POSplit.java:214)
>       at 
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:465)
>       at 
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:433)
>       at 
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:413)
>       at 
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:257)
>       at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
>       at 
> org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566)
>       at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408)
>       at org.apache.hadoop.mapred.Child.main(Child.java:170)
> {code}
> The key here is: Both operations try to compare and modify elements of the 
> SortedBag simultaneously. This leads to all kinds of problems, most notably 
> incorrectly sorted data.
> POSort.SortComparator that's passed as a Comparison function to 
> (Internal)SortedBag is not thread safe, since it works by attaching single 
> input tuples to PhysicalOperator's - these Operators in turn are part of the 
> POSort.sortPlans and are re-used among each thread accessing the 
> (Internal)SortedBag.
>  

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to