Carlos Balduz created PIG-4313:
----------------------------------

             Summary: StackOverflowError in LIMIT operation on Spark
                 Key: PIG-4313
                 URL: https://issues.apache.org/jira/browse/PIG-4313
             Project: Pig
          Issue Type: Bug
          Components: spark
            Reporter: Carlos Balduz
            Assignee: Carlos Balduz


The current implemenation of the LIMIT operation does not take into account the 
actual limit introduced in the script when iterating over the tuples. 
POOutputConsumerIterator will iterate over all of them even if the limit has 
already been reached. However, it uses POLimit's getNextTuple method to get the 
next result, and this returns a Result with a STATUS_EOP status once the limit 
has been reached. Since POOutputConsumerIterator calls recursively the readNext 
method when a STATUS_EOP is returned, a StackOverflowError is thrown for large 
datasets.

I have solved this by creating a subclass of POOutputConsumerIterator that 
takes into account the limit when reading results, so that it ends once it has 
found the desired number of STATUS_OK results.

2014-11-07 13:35:33,715 [Result resolver thread-0] WARN  
org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, 
master): java.lang.StackOverflowError:
        
org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793)
        
org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695)
        
org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793)
        
org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695)
        
org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793)
        
org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695)
        
org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793)
        
org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695)
        
org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793)
        
org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695)
        
org.joda.time.format.DateTimeFormatterBuilder$MatchingParser.parseInto(DateTimeFormatterBuilder.java:2793)
        
org.joda.time.format.DateTimeFormatterBuilder$Composite.parseInto(DateTimeFormatterBuilder.java:2695)
        
org.joda.time.format.DateTimeFormatter.parseDateTime(DateTimeFormatter.java:846)
        org.apache.pig.builtin.ToDate.extractDateTime(ToDate.java:124)
        
org.apache.pig.builtin.Utf8StorageConverter.bytesToDateTime(Utf8StorageConverter.java:541)
        org.apache.pig.impl.util.CastUtils.convertToType(CastUtils.java:61)
        org.apache.pig.builtin.PigStorage.applySchema(PigStorage.java:339)
        org.apache.pig.builtin.PigStorage.getNext(PigStorage.java:282)
        arq.hadoop.pig.loaders.PigStorage.getNext(PigStorage.java:49)
        
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader.nextKeyValue(PigRecordReader.java:204)
        
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:138)
        
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
        scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
        scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
        
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:95)
        
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98)
        
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98)
        
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98)
        
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98)
        
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98)
        
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98)
        
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98)
        
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98)
        
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98)
        
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98)
        
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98)
        
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98)
        
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98)
        
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98)
        
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98)
        
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98)
        
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter$LimitFunction$LimitConsumerIterator.readNext(LimitConverter.java:98)
        ...

        (~1000 lines)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to