so forcing the ShuffleMemoryManager to assume 32 cores and therefore calculate a pagesize of 1MB passes the tests.
How can we determine the correct value to use in getPageSize rather than Runtime.getRuntime.availableProcessors()? On 16 September 2015 at 10:17, Pete Robbins <robbin...@gmail.com> wrote: > I see what you are saying. Full stack trace: > > java.io.IOException: Unable to acquire 4194304 bytes of memory > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:349) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:478) > at > org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:138) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:489) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$ > 1.org > $apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) > at > org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.lang.Thread.run(Thread.java:785) > > On 16 September 2015 at 09:30, Reynold Xin <r...@databricks.com> wrote: > >> Can you paste the entire stacktrace of the error? In your original email >> you only included the last function call. >> >> Maybe I'm missing something here, but I still think the bad heuristics is >> the issue. >> >> Some operators pre-reserve memory before running anything in order to >> avoid starvation. For example, imagine we have an aggregate followed by a >> sort. If the aggregate is very high cardinality, and uses up all the memory >> and even starts spilling (falling back to sort-based aggregate), there >> isn't memory available at all for the sort operator to use. To work around >> this, each operator reserves a page of memory before they process any data. >> >> Page size is computed by Spark using: >> >> the total amount of execution memory / (maximum number of active tasks * >> 16) >> >> and then rounded to the next power of 2, and cap between 1MB and 64MB. >> >> That is to say, in the worst case, we should be able to reserve at least >> 8 pages (16 rounded up to the next power of 2). >> >> However, in your case, the max number of active tasks is 32 (set by test >> env), while the page size is determined using # cores (8 in your case). So >> it is off by a factor of 4. As a result, with this page size, we can only >> reserve at least 2 pages. That is to say, if you have more than 3 operators >> that need page reservation (e.g. an aggregate followed by a join on the >> group by key followed by a shuffle - which seems to be the case of >> join31.q), the task can fail to reserve memory before running anything. >> >> >> There is a 2nd problem (maybe this is the one you were trying to point >> out?) that is tasks running at the same time can be competing for memory >> with each other. Spark allows each task to claim up to 2/N share of >> memory, where N is the number of active tasks. If a task is launched before >> others and hogs a lot of memory quickly, the other tasks that are launched >> after it might not be able to get enough memory allocation, and thus will >> fail. This is not super ideal, but probably fine because tasks can be >> retried, and can succeed in retries. >> >> >> On Wed, Sep 16, 2015 at 1:07 AM, Pete Robbins <robbin...@gmail.com> >> wrote: >> >>> ok so let me try again ;-) >>> >>> I don't think that the page size calculation matters apart from hitting >>> the allocation limit earlier if the page size is too large. >>> >>> If a task is going to need X bytes, it is going to need X bytes. In this >>> case, for at least one of the tasks, X > maxmemory/no_active_tasks at some >>> point during execution. A smaller page size may use the memory more >>> efficiently but would not necessarily avoid this issue. >>> >>> The next question would be: Is the memory limit per task of >>> max_memory/no_active_tasks reasonable? It seems fair but if this limit is >>> reached currently an exception is thrown, maybe the task could wait for >>> no_active_tasks to decrease? >>> >>> I think what causes my test issue is that the 32 tasks don't execute as >>> quickly on my 8 core box so more are active at any one time. >>> >>> I will experiment with the page size calculation to see what effect it >>> has. >>> >>> Cheers, >>> >>> >>> >>> On 16 September 2015 at 06:53, Reynold Xin <r...@databricks.com> wrote: >>> >>>> It is exactly the issue here, isn't it? >>>> >>>> We are using memory / N, where N should be the maximum number of active >>>> tasks. In the current master, we use the number of cores to approximate the >>>> number of tasks -- but it turned out to be a bad approximation in tests >>>> because it is set to 32 to increase concurrency. >>>> >>>> >>>> On Tue, Sep 15, 2015 at 10:47 PM, Pete Robbins <robbin...@gmail.com> >>>> wrote: >>>> >>>>> Oops... I meant to say "The page size calculation is NOT the issue >>>>> here" >>>>> >>>>> On 16 September 2015 at 06:46, Pete Robbins <robbin...@gmail.com> >>>>> wrote: >>>>> >>>>>> The page size calculation is the issue here as there is plenty of >>>>>> free memory, although there is maybe a fair bit of wasted space in some >>>>>> pages. It is that when we have a lot of tasks each is only allowed to >>>>>> reach >>>>>> 1/n of the available memory and several of the tasks bump in to that >>>>>> limit. >>>>>> With tasks 4 times the number of cores there will be some contention and >>>>>> so >>>>>> they remain active for longer. >>>>>> >>>>>> So I think this is a test case issue configuring the number of >>>>>> executors too high. >>>>>> >>>>>> On 15 September 2015 at 18:54, Reynold Xin <r...@databricks.com> >>>>>> wrote: >>>>>> >>>>>>> Maybe we can change the heuristics in memory calculation to use >>>>>>> SparkContext.defaultParallelism if it is local mode. >>>>>>> >>>>>>> >>>>>>> On Tue, Sep 15, 2015 at 10:28 AM, Pete Robbins <robbin...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Yes and at least there is an override by setting >>>>>>>> spark.sql.test.master to local[8] , in fact local[16] worked on my 8 >>>>>>>> core >>>>>>>> box. >>>>>>>> >>>>>>>> I'm happy to use this as a workaround but the 32 hard-coded will >>>>>>>> fail running build/tests on a clean checkout if you only have 8 cores. >>>>>>>> >>>>>>>> On 15 September 2015 at 17:40, Marcelo Vanzin <van...@cloudera.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> That test explicitly sets the number of executor cores to 32. >>>>>>>>> >>>>>>>>> object TestHive >>>>>>>>> extends TestHiveContext( >>>>>>>>> new SparkContext( >>>>>>>>> System.getProperty("spark.sql.test.master", "local[32]"), >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin <r...@databricks.com> >>>>>>>>> wrote: >>>>>>>>> > Yea I think this is where the heuristics is failing -- it uses 8 >>>>>>>>> cores to >>>>>>>>> > approximate the number of active tasks, but the tests somehow is >>>>>>>>> using 32 >>>>>>>>> > (maybe because it explicitly sets it to that, or you set it >>>>>>>>> yourself? I'm >>>>>>>>> > not sure which one) >>>>>>>>> > >>>>>>>>> > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins < >>>>>>>>> robbin...@gmail.com> wrote: >>>>>>>>> >> >>>>>>>>> >> Reynold, thanks for replying. >>>>>>>>> >> >>>>>>>>> >> getPageSize parameters: maxMemory=515396075, numCores=0 >>>>>>>>> >> Calculated values: cores=8, default=4194304 >>>>>>>>> >> >>>>>>>>> >> So am I getting a large page size as I only have 8 cores? >>>>>>>>> >> >>>>>>>>> >> On 15 September 2015 at 00:40, Reynold Xin <r...@databricks.com> >>>>>>>>> wrote: >>>>>>>>> >>> >>>>>>>>> >>> Pete - can you do me a favor? >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 >>>>>>>>> >>> >>>>>>>>> >>> Print the parameters that are passed into the getPageSize >>>>>>>>> function, and >>>>>>>>> >>> check their values. >>>>>>>>> >>> >>>>>>>>> >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin < >>>>>>>>> r...@databricks.com> wrote: >>>>>>>>> >>>> >>>>>>>>> >>>> Is this on latest master / branch-1.5? >>>>>>>>> >>>> >>>>>>>>> >>>> out of the box we reserve only 16% (0.2 * 0.8) of the memory >>>>>>>>> for >>>>>>>>> >>>> execution (e.g. aggregate, join) / shuffle sorting. With a >>>>>>>>> 3GB heap, that's >>>>>>>>> >>>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator >>>>>>>>> reserves at >>>>>>>>> >>>> least one page for execution. If your page size is 4MB, it >>>>>>>>> only takes 3 >>>>>>>>> >>>> operators to use up its memory. >>>>>>>>> >>>> >>>>>>>>> >>>> The thing is page size is dynamically determined -- and in >>>>>>>>> your case it >>>>>>>>> >>>> should be smaller than 4MB. >>>>>>>>> >>>> >>>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 >>>>>>>>> >>>> >>>>>>>>> >>>> Maybe there is a place that in the maven tests that we >>>>>>>>> explicitly set >>>>>>>>> >>>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need >>>>>>>>> to find it and >>>>>>>>> >>>> just remove it. >>>>>>>>> >>>> >>>>>>>>> >>>> >>>>>>>>> >>>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins < >>>>>>>>> robbin...@gmail.com> >>>>>>>>> >>>> wrote: >>>>>>>>> >>>>> >>>>>>>>> >>>>> I keep hitting errors running the tests on 1.5 such as >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> - join31 *** FAILED *** >>>>>>>>> >>>>> Failed to execute query using catalyst: >>>>>>>>> >>>>> Error: Job aborted due to stage failure: Task 9 in stage >>>>>>>>> 3653.0 >>>>>>>>> >>>>> failed 1 times, most recent failure: Lost task 9.0 in stage >>>>>>>>> 3653.0 (TID >>>>>>>>> >>>>> 123363, localhost): java.io.IOException: Unable to acquire >>>>>>>>> 4194304 bytes of >>>>>>>>> >>>>> memory >>>>>>>>> >>>>> at >>>>>>>>> >>>>> >>>>>>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368) >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> This is using the command >>>>>>>>> >>>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver >>>>>>>>> test >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> I don't see these errors in any of the amplab jenkins >>>>>>>>> builds. Do those >>>>>>>>> >>>>> builds have any configuration/environment that I may be >>>>>>>>> missing? My build is >>>>>>>>> >>>>> running with whatever defaults are in the top level pom.xml, >>>>>>>>> eg -Xmx3G. >>>>>>>>> >>>>> >>>>>>>>> >>>>> I can make these tests pass by setting >>>>>>>>> spark.shuffle.memoryFraction=0.6 >>>>>>>>> >>>>> in the HiveCompatibilitySuite rather than the default 0.2 >>>>>>>>> value. >>>>>>>>> >>>>> >>>>>>>>> >>>>> Trying to analyze what is going on with the test it is >>>>>>>>> related to the >>>>>>>>> >>>>> number of active tasks, which seems to rise to 32, and so the >>>>>>>>> >>>>> ShuffleMemoryManager allows less memory per task even though >>>>>>>>> most of those >>>>>>>>> >>>>> tasks do not have any memory allocated to them. >>>>>>>>> >>>>> >>>>>>>>> >>>>> Has anyone seen issues like this before? >>>>>>>>> >>>> >>>>>>>>> >>>> >>>>>>>>> >>> >>>>>>>>> >> >>>>>>>>> > >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Marcelo >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >