Hi everyone,


My name is Alex and I've been using Spark for the past 4 years to solve
most, if not all, of my data processing challenges. From time to time I go
a bit left field with this :). Like embedding Spark in my JVM based
application running only in `local` mode and using it as a real-time
analytics and on-the-fly data processing tool for what I would call `micro
OLAP`. The applications I am referring too are long running, interactive
and user facing. Now before calling me a nutcase and telling me `you
shouldn't do this` I would like to advocate why I love this approach so
much.



Dataset and the general abstraction to load data from anywhere. No matter
if I want to read Parquet files from local storage or S3, get the data from
a JDBC compliant database or any other exotic persistence layer. I can code
my application in such a way that it's easy to configure from where I want
fetch the data. Once data sources are configured the only interaction
required in code is based on Datasets. If data is migrated to another kind
of storage system or DB, mostly it boils down to adjusting configuration
and not adjusting code.



Then there is the ease and power of the DSL that can be applied on those
Datasets. I can often reduce business logic in my applications to

   - fetch datasets from different data sources
   - join them
   - apply transformations on top of it



Expressing this using datasets and the DSL and having the power of Spark
building out a query plan and fully utilizing the resources works out
nicely for me to a certain extent  :) (I'll come back to that in a second).



So I am in a situation when a user interacts with a certain part of my
application it actually triggers a micro-spark job, running confined in
that same single JVM serving the application, collecting results and
preparing it to be presented.



Now in the current era where everything is going cloud native/agnostic and
stuff is running on Kubernetes most of the pods running my JVM applications
have little resources. You really should think in terms of 2 CPUs, 1.5G
RAM.  This is where things work `to a certain extent`.



I started to notice performance issues in those resource restricted
environments, even after tuning all known public parameters. Eventually I
took my debugger and Spark source code to figure out why performance was
degrading. After some time I figured out that there are some thread pools
created, which are not configurable, and from the start have rather strange
defaults I'd think.



   - BlockManagerStorageEndpoint starts a cached thread pool with max
   threads 100
   - BlockManagerMasterEndpoint starts a cached thread pool with max
   threads 100
   - LocalSchedulerBackend starts an executor using
   Executors.newCachedThreadPool which, to my understanding, allows the number
   of threads to go up to Integer.MAX_VALUE.
   - MapOutputTracker creates a fixed thread pool based on a configurable
   parameter (oh yes epic win) but there is no keepAliveTime set so I think
   threads just stay alive until the pool is shut down
   - AdaptiveSparkPlanExec uses a fixed cached thread pool of 16 which
   seems a bit arbitrary ?



As soon as Spark bootstraps somewhere between 200-300 threads are created,
degrading the performance of the poor JVM in my situation. With keep alive
times set to 60 seconds by default it also takes time to settle.



Now given the fact that I am using Spark in an unorthodox way I would like
to hear your opinions and words of wisdom on making all thread pools used
inside Spark configurable. Supporting backwards compatibility by setting
the defaults as they are today, but where it would be possible  to set

   - Core threads (same as max threads)
   - Queue limit
   - Keep alive time
   - Allowing core thread to time out



I am more than happy to help with PRs for these changes, but I need
guidance because I am not acquainted with the codebase.



Kind regards and looking forward to your reply,

Alex

Reply via email to