[
https://issues.apache.org/jira/browse/FLINK-9442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Robert Metzger updated FLINK-9442:
----------------------------------
Component/s: Runtime / Coordination
> Flink Scaling not working
> -------------------------
>
> Key: FLINK-9442
> URL: https://issues.apache.org/jira/browse/FLINK-9442
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.4.2
> Reporter: swy
> Priority: Major
>
> Hi,
>
> We are in the middle of testing scaling ability of Flink. But we found that
> scaling not working, no matter increase more slot or increase number of Task
> Manager. We would expect a linear, if not close-to-linear scaling performance
> but the result even show degradation. Appreciated any comments.
>
> Test Details,
>
> -VMWare vsphere
> -Just a simple pass through test,
> - auto gen source 3mil records, each 1kb in size, parallelism=1
> - source pass into next map operator, which just return the same record,
> and sent counter to statsD, parallelism is in cases = 2,4,6
> - 3 TM, total 6 slots(2/TM) each JM/TM has 32 vCPU, 100GB memory
> - Result:
> - 2 slots: 26 seconds, 3mil/26=115k TPS
> - 4 slots: 23 seconds, 3mil/23=130k TPS
> - 6 slots: 22 seconds, 3mil/22=136k TPS
>
> As shown the scaling is almost nothing, and capped at ~120k TPS. Any clue?
> Thanks.
>
>
>
> public class passthru extends RichMapFunction<String, String> {
> public void open(Configuration configuration) throws Exception {
> ... ...
> stats = new NonBlockingStatsDClient();
> }
> public String map(String value) throws Exception {
> ... ...
> stats.increment();
> return value;
> }
> }
> public class datagen extends RichSourceFunction<String> {
> ... ...
> public void run(SourceContext<String> ctx) throws Exception {
> int i = 0;
> while (run){
> String idx = String.format("%09d", i);
> ctx.collect("{\"<a 1kb json content with idx in certain json
> field>\"}");
> i++;
> if(i == loop)
> run = false;
> }
> }
> ... ...
> }
> public class Job {
> public static void main(String[] args) throws Exception {
> ... ...
> DataStream<String> stream = env.addSource(new
> datagen(loop)).rebalance();
> DataStream<String> convert = stream.map(new passthru(statsdUrl));
> env.execute("Flink");
> }
> }
> The reason of this sample test is because of Kafka source
> FlinkKafkaConsumer011 facing the same issue which is not scale-able. And
> FlinkKafkaConsumer011 already using RichParallelSourceFunction. And we always
> set kafka partition = total TM #slot. But the result is still capped and not
> improve linearly.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)