[ https://issues.apache.org/jira/browse/PIG-3212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13587361#comment-13587361 ]
Dmitriy V. Ryaboy commented on PIG-3212: ---------------------------------------- Ok so I think the fix works, but it seems like this bug has been there for a long time -- the synchronization issue with SMM would've been there before 0.11, as well. Is it just more visible now because SMM is faster (fewer bags to go through)? It seems unlikely that we ever get to spill an internal sorted bag, given this patch.. seems like it almost always has an iterator open. If the concern is using the same comparator -- could we not solve this by initializing a new comparator for every bag? > Race Conditions in POSort and (Internal)SortedBag during Proactive Spill. > ------------------------------------------------------------------------- > > Key: PIG-3212 > URL: https://issues.apache.org/jira/browse/PIG-3212 > Project: Pig > Issue Type: Bug > Affects Versions: 0.11 > Reporter: Kai Londenberg > Priority: Critical > Fix For: 0.12, 0.11.1 > > Attachments: PIG-3212-p1.patch > > > The following bug exists in the latest release of Pig 0.11.0 > While running some large jobs involving groups and sorts like these: > {code} > events_by_user = GROUP events BY user_id; > sorted_events_by_user = FOREACH events_by_user { > A = ORDER events BY ts, split_idx, line_num; > GENERATE group, A; > } > {code} > I got a pretty strange behaviour: While this worked on small datasets, if I > ran it on large datasets, the results were sometimes not sorted perfectly. > So after a long debugging session, I tracked it down to at least one race > condition: > The following partial stack trace shows how a proactive spill gets triggered > on an InternalSortedBag. A spill in turn triggers a sort of that > InternalSortedBag. > {code} > at > org.apache.pig.data.SortedSpillBag.proactive_spill(SortedSpillBag.java:83) > at > org.apache.pig.data.InternalSortedBag.spill(InternalSortedBag.java:455) > at > org.apache.pig.impl.util.SpillableMemoryManager.handleNotification(SpillableMemoryManager.java:243) > at > sun.management.NotificationEmitterSupport.sendNotification(NotificationEmitterSupport.java:138) > at sun.management.MemoryImpl.createNotification(MemoryImpl.java:171) > at > sun.management.MemoryPoolImpl$PoolSensor.triggerAction(MemoryPoolImpl.java:272) > at sun.management.Sensor.trigger(Sensor.java:120) > {code} > At the same time, the same InternalSortedBag might be sorted or accessed > within a POSort Operation. For example using the following Code path (line > numbers might be off, I had to add debug statements to diagnose this) > {code} > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort.getNext(POSort.java:346) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:492) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:582) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject.getNext(PORelationToExprProject.java:107) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:394) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:372) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:297) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:368) > at > org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit.getNext(POSplit.java:214) > at > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:465) > at > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:433) > at > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:413) > at > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:257) > at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) > at > org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566) > at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408) > at org.apache.hadoop.mapred.Child.main(Child.java:170) > {code} > The key here is: Both operations try to compare and modify elements of the > SortedBag simultaneously. This leads to all kinds of problems, most notably > incorrectly sorted data. > POSort.SortComparator that's passed as a Comparison function to > (Internal)SortedBag is not thread safe, since it works by attaching single > input tuples to PhysicalOperator's - these Operators in turn are part of the > POSort.sortPlans and are re-used among each thread accessing the > (Internal)SortedBag. > -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira