Hi Peter,

I actually meant the spark configuration that you put in your spark-submit
program (such as --conf spark.executor.instances= ..., --conf
spark.executor.memory= ..., etc...).

I advice you to check the number of partitions that you get in each stage
of your workload the Spark GUI while the workload is running. I feel like
this number is beyond 80, and this is why overcommiting cpu cores can
achieve better latency if the workload is not cpu intensive.

Another question, did you try different values for spark.executor.cores?
(for example 3, 4 or 5 cores per executor in addition to 2?) Try to play a
little bit with this parameter and check how it affects your latency...

Best,

Khaled



On Tue, Oct 16, 2018 at 3:06 AM Peter Liu <peter.p...@gmail.com> wrote:

> Hi Khaled,
>
> I have attached the spark streaming config below in (a).
> In case of the 100vcore run (see the initial email), I used 50 executors
> where each executor has 2 vcores and 3g memory. For 70 vcore case, 35
> executors, for 80 vcore case, 40 executors.
> In the yarn config (yarn-site.xml, (b) below), the  available vcores set
> over 80 (I call it "overcommit").
>
> Not sure if there is a more proper way to do this (overcommit) and what
> would be the best practice in this type of situation (say, light cpu
> workload in a dedicated yarn cluster) to increase the cpu utilization for a
> better performance.
>
> Any help would be very much appreciated.
>
> Thanks ...
>
> Peter
>
> (a)
>
>    val df = spark
>       .readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
>       .option("startingOffsets", "latest")
>       .option("subscribe", Variables.EVENTS_TOPIC)
>       .option("kafkaConsumer.pollTimeoutMs", "5000")
>       .load()
>       .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS
> TIMESTAMP)").as[(String, Timestamp)]
>       .select(from_json($"value", mySchema).as("data"), $"timestamp")
>       .select("data.*", "timestamp")
>       .where($"event_type" === "view")
>       .select($"ad_id", $"event_time")
>       .join(campaigns.toSeq.toDS().cache(), Seq("ad_id"))
>       .groupBy(millisTime(window($"event_time", "10
> seconds").getField("start")) as 'time_window, $"campaign_id")
>       .agg(count("*") as 'count, max('event_time) as 'lastUpdate)
>       .select(to_json(struct("*")) as 'value)
>       .writeStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
> //original
>       .option("topic", Variables.OUTPUT_TOPIC)
>       .option("checkpointLocation",
> s"/tmp/${java.util.UUID.randomUUID()}") //TBD: ram disk?
>       .outputMode("update")
>       .start()
>
> (b)
> <property>
>     <name>yarn.nodemanager.resource.cpu-vcores</name>
>     <value>110</value>
> </property>
> <property>
> <name>yarn.scheduler.maximum-allocation-vcores</name>
> <value>110</value>
> </property>
>
> On Mon, Oct 15, 2018 at 4:26 PM Khaled Zaouk <khaledz...@gmail.com> wrote:
>
>> Hi Peter,
>>
>> What parameters are you putting in your spark streaming configuration?
>> What are you putting as number of executor instances and how many cores per
>> executor are you setting in your Spark job?
>>
>> Best,
>>
>> Khaled
>>
>> On Mon, Oct 15, 2018 at 9:18 PM Peter Liu <peter.p...@gmail.com> wrote:
>>
>>> Hi there,
>>>
>>> I have a system with 80 vcores and a relatively light spark streaming
>>> workload. Overcomming the vcore resource (i.e. > 80) in the config (see (a)
>>> below) seems to help to improve the average spark batch time (see (b)
>>> below).
>>>
>>> Is there any best practice guideline on resource overcommit with cpu /
>>> vcores, such as yarn config options, candidate cases ideal for
>>> overcommiting vcores etc.?
>>>
>>> the slide below (from 2016 though) seems to address the memory
>>> overcommit topic and hint a "future" topic on cpu overcommit:
>>>
>>> https://www.slideshare.net/HadoopSummit/investing-the-effects-of-overcommitting-yarn-resources
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.slideshare.net_HadoopSummit_investing-2Dthe-2Deffects-2Dof-2Dovercommitting-2Dyarn-2Dresources&d=DwMFaQ&c=jf_iaSHvJObTbx-siA1ZOg&r=9YF85k6Q86ELSbXl40mkGw&m=ZCbfeVtFh_TC0b2e0fobq62qrBKhQPtyBNfMsVcVzmo&s=UXeomeHkGRlHg9Bxgb81T98oH7zj7T6OmF4dsfhK0Sg&e=>
>>>
>>> Would like to know if this is a reasonable config practice and why this
>>> is not achievable without overcommit. Any help/hint would be very much
>>> appreciated!
>>>
>>> Thanks!
>>>
>>> Peter
>>>
>>> (a) yarn-site.xml
>>> <property>
>>>     <name>yarn.nodemanager.resource.cpu-vcores</name>
>>>     <value>110</value>
>>> </property>
>>>
>>> <property>
>>> <name>yarn.scheduler.maximum-allocation-vcores</name>
>>> <value>110</value>
>>> </property>
>>>
>>>
>>> (b)
>>> FYI:
>>> I have a system with 80 vcores and a relatively light spark streaming
>>> workload. overcomming the vocore resource (here 100) seems to help the
>>> average spark batch time. need more understanding on this practice.
>>> Skylake (1 x 900K msg/sec) total batch# (avg) avg batch time in ms (avg) avg
>>> user cpu (%) nw read (mb/sec)
>>> 70vocres 178.20 8154.69 n/a n/a
>>> 80vocres 177.40 7865.44 27.85 222.31
>>> 100vcores 177.00 7,209.37 30.02 220.86
>>>
>>>

Reply via email to