so forcing the ShuffleMemoryManager to assume 32 cores and therefore
calculate a pagesize of 1MB passes the tests.

How can we determine the correct value to use in getPageSize rather than
Runtime.getRuntime.availableProcessors()?

On 16 September 2015 at 10:17, Pete Robbins <robbin...@gmail.com> wrote:

> I see what you are saying. Full stack trace:
>
> java.io.IOException: Unable to acquire 4194304 bytes of memory
>       at
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>       at
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:349)
>       at
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:478)
>       at
> org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:138)
>       at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:489)
>       at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379)
>       at
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
>       at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$
> 1.org
> $apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
>       at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
>       at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
>       at
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at
> org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>       at org.apache.spark.scheduler.Task.run(Task.scala:88)
>       at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>       at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>       at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>       at java.lang.Thread.run(Thread.java:785)
>
> On 16 September 2015 at 09:30, Reynold Xin <r...@databricks.com> wrote:
>
>> Can you paste the entire stacktrace of the error? In your original email
>> you only included the last function call.
>>
>> Maybe I'm missing something here, but I still think the bad heuristics is
>> the issue.
>>
>> Some operators pre-reserve memory before running anything in order to
>> avoid starvation. For example, imagine we have an aggregate followed by a
>> sort. If the aggregate is very high cardinality, and uses up all the memory
>> and even starts spilling (falling back to sort-based aggregate), there
>> isn't memory available at all for the sort operator to use. To work around
>> this, each operator reserves a page of memory before they process any data.
>>
>> Page size is computed by Spark using:
>>
>> the total amount of execution memory / (maximum number of active tasks *
>> 16)
>>
>> and then rounded to the next power of 2, and cap between 1MB and 64MB.
>>
>> That is to say, in the worst case, we should be able to reserve at least
>> 8 pages (16 rounded up to the next power of 2).
>>
>> However, in your case, the max number of active tasks is 32 (set by test
>> env), while the page size is determined using # cores (8 in your case). So
>> it is off by a factor of 4. As a result, with this page size, we can only
>> reserve at least 2 pages. That is to say, if you have more than 3 operators
>> that need page reservation (e.g. an aggregate followed by a join on the
>> group by key followed by a shuffle - which seems to be the case of
>> join31.q), the task can fail to reserve memory before running anything.
>>
>>
>> There is a 2nd problem (maybe this is the one you were trying to point
>> out?) that is tasks running at the same time can be competing for memory
>> with each other.  Spark allows each task to claim up to 2/N share of
>> memory, where N is the number of active tasks. If a task is launched before
>> others and hogs a lot of memory quickly, the other tasks that are launched
>> after it might not be able to get enough memory allocation, and thus will
>> fail. This is not super ideal, but probably fine because tasks can be
>> retried, and can succeed in retries.
>>
>>
>> On Wed, Sep 16, 2015 at 1:07 AM, Pete Robbins <robbin...@gmail.com>
>> wrote:
>>
>>> ok so let me try again ;-)
>>>
>>> I don't think that the page size calculation matters apart from hitting
>>> the allocation limit earlier if the page size is too large.
>>>
>>> If a task is going to need X bytes, it is going to need X bytes. In this
>>> case, for at least one of the tasks, X > maxmemory/no_active_tasks at some
>>> point during execution. A smaller page size may use the memory more
>>> efficiently but would not necessarily avoid this issue.
>>>
>>> The next question would be: Is the memory limit per task of
>>> max_memory/no_active_tasks reasonable? It seems fair but if this limit is
>>> reached currently an exception is thrown, maybe the task could wait for
>>> no_active_tasks to decrease?
>>>
>>> I think what causes my test issue is that the 32 tasks don't execute as
>>> quickly on my 8 core box so more are active at any one time.
>>>
>>> I will experiment with the page size calculation to see what effect it
>>> has.
>>>
>>> Cheers,
>>>
>>>
>>>
>>> On 16 September 2015 at 06:53, Reynold Xin <r...@databricks.com> wrote:
>>>
>>>> It is exactly the issue here, isn't it?
>>>>
>>>> We are using memory / N, where N should be the maximum number of active
>>>> tasks. In the current master, we use the number of cores to approximate the
>>>> number of tasks -- but it turned out to be a bad approximation in tests
>>>> because it is set to 32 to increase concurrency.
>>>>
>>>>
>>>> On Tue, Sep 15, 2015 at 10:47 PM, Pete Robbins <robbin...@gmail.com>
>>>> wrote:
>>>>
>>>>> Oops... I meant to say "The page size calculation is NOT the issue
>>>>> here"
>>>>>
>>>>> On 16 September 2015 at 06:46, Pete Robbins <robbin...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> The page size calculation is the issue here as there is plenty of
>>>>>> free memory, although there is maybe a fair bit of wasted space in some
>>>>>> pages. It is that when we have a lot of tasks each is only allowed to 
>>>>>> reach
>>>>>> 1/n of the available memory and several of the tasks bump in to that 
>>>>>> limit.
>>>>>> With tasks 4 times the number of cores there will be some contention and 
>>>>>> so
>>>>>> they remain active for longer.
>>>>>>
>>>>>> So I think this is a test case issue configuring the number of
>>>>>> executors too high.
>>>>>>
>>>>>> On 15 September 2015 at 18:54, Reynold Xin <r...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Maybe we can change the heuristics in memory calculation to use
>>>>>>> SparkContext.defaultParallelism if it is local mode.
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 15, 2015 at 10:28 AM, Pete Robbins <robbin...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yes and at least there is an override by setting
>>>>>>>> spark.sql.test.master to local[8] , in fact local[16] worked on my 8 
>>>>>>>> core
>>>>>>>> box.
>>>>>>>>
>>>>>>>> I'm happy to use this as a workaround but the 32 hard-coded will
>>>>>>>> fail running build/tests on a clean checkout if you only have 8 cores.
>>>>>>>>
>>>>>>>> On 15 September 2015 at 17:40, Marcelo Vanzin <van...@cloudera.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> That test explicitly sets the number of executor cores to 32.
>>>>>>>>>
>>>>>>>>> object TestHive
>>>>>>>>>   extends TestHiveContext(
>>>>>>>>>     new SparkContext(
>>>>>>>>>       System.getProperty("spark.sql.test.master", "local[32]"),
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin <r...@databricks.com>
>>>>>>>>> wrote:
>>>>>>>>> > Yea I think this is where the heuristics is failing -- it uses 8
>>>>>>>>> cores to
>>>>>>>>> > approximate the number of active tasks, but the tests somehow is
>>>>>>>>> using 32
>>>>>>>>> > (maybe because it explicitly sets it to that, or you set it
>>>>>>>>> yourself? I'm
>>>>>>>>> > not sure which one)
>>>>>>>>> >
>>>>>>>>> > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins <
>>>>>>>>> robbin...@gmail.com> wrote:
>>>>>>>>> >>
>>>>>>>>> >> Reynold, thanks for replying.
>>>>>>>>> >>
>>>>>>>>> >> getPageSize parameters: maxMemory=515396075, numCores=0
>>>>>>>>> >> Calculated values: cores=8, default=4194304
>>>>>>>>> >>
>>>>>>>>> >> So am I getting a large page size as I only have 8 cores?
>>>>>>>>> >>
>>>>>>>>> >> On 15 September 2015 at 00:40, Reynold Xin <r...@databricks.com>
>>>>>>>>> wrote:
>>>>>>>>> >>>
>>>>>>>>> >>> Pete - can you do me a favor?
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>>>>>> >>>
>>>>>>>>> >>> Print the parameters that are passed into the getPageSize
>>>>>>>>> function, and
>>>>>>>>> >>> check their values.
>>>>>>>>> >>>
>>>>>>>>> >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin <
>>>>>>>>> r...@databricks.com> wrote:
>>>>>>>>> >>>>
>>>>>>>>> >>>> Is this on latest master / branch-1.5?
>>>>>>>>> >>>>
>>>>>>>>> >>>> out of the box we reserve only 16% (0.2 * 0.8) of the memory
>>>>>>>>> for
>>>>>>>>> >>>> execution (e.g. aggregate, join) / shuffle sorting. With a
>>>>>>>>> 3GB heap, that's
>>>>>>>>> >>>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator
>>>>>>>>> reserves at
>>>>>>>>> >>>> least one page for execution. If your page size is 4MB, it
>>>>>>>>> only takes 3
>>>>>>>>> >>>> operators to use up its memory.
>>>>>>>>> >>>>
>>>>>>>>> >>>> The thing is page size is dynamically determined -- and in
>>>>>>>>> your case it
>>>>>>>>> >>>> should be smaller than 4MB.
>>>>>>>>> >>>>
>>>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174
>>>>>>>>> >>>>
>>>>>>>>> >>>> Maybe there is a place that in the maven tests that we
>>>>>>>>> explicitly set
>>>>>>>>> >>>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need
>>>>>>>>> to find it and
>>>>>>>>> >>>> just remove it.
>>>>>>>>> >>>>
>>>>>>>>> >>>>
>>>>>>>>> >>>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins <
>>>>>>>>> robbin...@gmail.com>
>>>>>>>>> >>>> wrote:
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> I keep hitting errors running the tests on 1.5 such as
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> - join31 *** FAILED ***
>>>>>>>>> >>>>>   Failed to execute query using catalyst:
>>>>>>>>> >>>>>   Error: Job aborted due to stage failure: Task 9 in stage
>>>>>>>>> 3653.0
>>>>>>>>> >>>>> failed 1 times, most recent failure: Lost task 9.0 in stage
>>>>>>>>> 3653.0 (TID
>>>>>>>>> >>>>> 123363, localhost): java.io.IOException: Unable to acquire
>>>>>>>>> 4194304 bytes of
>>>>>>>>> >>>>> memory
>>>>>>>>> >>>>>       at
>>>>>>>>> >>>>>
>>>>>>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368)
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> This is using the command
>>>>>>>>> >>>>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver
>>>>>>>>> test
>>>>>>>>> >>>>>
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> I don't see these errors in any of the amplab jenkins
>>>>>>>>> builds. Do those
>>>>>>>>> >>>>> builds have any configuration/environment that I may be
>>>>>>>>> missing? My build is
>>>>>>>>> >>>>> running with whatever defaults are in the top level pom.xml,
>>>>>>>>> eg -Xmx3G.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> I can make these tests pass by setting
>>>>>>>>> spark.shuffle.memoryFraction=0.6
>>>>>>>>> >>>>> in the HiveCompatibilitySuite rather than the default 0.2
>>>>>>>>> value.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> Trying to analyze what is going on with the test it is
>>>>>>>>> related to the
>>>>>>>>> >>>>> number of active tasks, which seems to rise to 32, and so the
>>>>>>>>> >>>>> ShuffleMemoryManager allows less memory per task even though
>>>>>>>>> most of those
>>>>>>>>> >>>>> tasks do not have any memory allocated to them.
>>>>>>>>> >>>>>
>>>>>>>>> >>>>> Has anyone seen issues like this before?
>>>>>>>>> >>>>
>>>>>>>>> >>>>
>>>>>>>>> >>>
>>>>>>>>> >>
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Marcelo
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to