[
https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193547#comment-15193547
]
Ioannis Deligiannis edited comment on SPARK-13718 at 3/14/16 4:40 PM:
----------------------------------------------------------------------
Our solution can serve up to 8 users requests per second with a constant
latency of less than 2 seconds. If users requests increase, throughput
increases to ~16 users per second with latency just above 6 seconds. This
performance scales linearly when the described issue is not materialized.
So this translates to ~240 users per minute on constant latency or 480 users
per minute on higher latency (but less than 10 seconds).
PS. As I mentioned above, these are not exactly point queries so each
aggregation typically works on less than 2% of the data. (--Removed bit as it
was confusing/misleading--)
was (Author: jid1):
Our solution can serve up to 8 users requests per second with a constant
latency of less than 2 seconds. If users requests increase, throughput
increases to ~16 users per second with latency just above 6 seconds. This
performance scales linearly when the described issue is not materialized.
So this translates to ~240 users per minute on constant latency or 480 users
per minute on higher latency (but less than 10 seconds).
PS. As I mentioned above, these are not exactly point queries so each
aggregation typically works on less than 2% of the data. Also note, that in
practice the 'long operations' is not DNA analysis :) but part of Spark Kryo
serialization and compression of the cached RDD partitions.
> Scheduler "creating" straggler node
> ------------------------------------
>
> Key: SPARK-13718
> URL: https://issues.apache.org/jira/browse/SPARK-13718
> Project: Spark
> Issue Type: Improvement
> Components: Scheduler, Spark Core
> Affects Versions: 1.3.1
> Environment: Spark 1.3.1
> MapR-FS
> Single Rack
> Standalone mode scheduling
> 8 node cluster
> 48 cores & 512 RAM / node
> Data Replication factor of 3
> Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM.
> Reporter: Ioannis Deligiannis
> Priority: Minor
> Attachments: TestIssue.java, spark_struggler.jpg
>
>
> *Data:*
> * Assume an even distribution of data across the cluster with a replication
> factor of 3.
> * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3
> requests can be executed concurrently(-ish) )
> *Action:*
> * Action is a simple sequence of map/filter/reduce.
> * The action operates upon and returns a small subset of data (following the
> full map over the data).
> * Data are 1 x cached serialized in memory (Kryo), so calling the action
> should not hit the disk under normal conditions.
> * Action network usage is low as it returns a small number of aggregated
> results and does not require excessive shuffling
> * Under low or moderate load, each action is expected to complete in less
> than 2 seconds
> *H/W Outlook*
> When the action is called in high numbers, initially the cluster CPU gets
> close to 100% (which is expected & intended).
> After a while, the cluster utilization reduces significantly with only one
> (struggler) node having 100% CPU and fully utilized network.
> *Diagnosis:*
> 1. Attached a profiler to the driver and executors to monitor GC or I/O
> issues and everything is normal under low or heavy usage.
> 2. Cluster network usage is very low
> 3. No issues on Spark UI except that tasks begin to move from LOCAL to ANY
> *Cause (Corrected as found details in code):*
> 1. Node 'H' is doing marginally more work than the rest (being a little
> slower and at almost 100% CPU)
> 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the
> task to other nodes
> 3. One of the nodes 'X' that accepted the task will try to access the data
> from node 'H' HDD. This adds Network I/O to node and also some extra CPU for
> I/O.
> 4. 'X' time to complete increases ~5x as it goes over Network
> 5. Eventually, every node will have a task that is waiting to fetch that
> specific partition from node 'H' so cluster is basically blocked on a single
> node
> What I managed to figure out from the code is that this is because if an RDD
> is cached, it will make use of BlockManager.getRemote() and will not
> recompute the DAG part that resulted in this RDD and hence always hit the
> node that has cached the RDD.
> * Proposed Fix *
> I have not worked with Scala & Spark source code enough to propose a code
> fix, but on a high level, when a task hits the 'spark.locality.wait' timeout,
> it could make use of a new configuration e.g.
> recomputeRddAfterLocalityTimeout instead of always trying to get the cached
> RDD. This would be very useful if it could also be manually set on the RDD.
> *Workaround*
> Playing with 'spark.locality.wait' values, there is a deterministic value
> depending on partitions and config where the problem ceases to exist.
> *PS1* : Don't have enough Scala skils to follow the issue or propose a fix,
> but I hope that this has enough information to make sense.
> *PS2* : Debugging this issue made me realize that there can be a lot of
> use-cases that trigger this behaviour
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]