[ 
https://issues.apache.org/jira/browse/BEAM-2803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148145#comment-16148145
 ] 

Eugene Kirpichov commented on BEAM-2803:
----------------------------------------

Side inputs (at least in Dataflow) have the disadvantage that they materialize 
via Avro, which has pretty high granularity of reading parallelism (unlike 
shuffle). I'm currently running some experiments to compare 3 approaches: 
shuffle, side input, and side input THEN shuffle (the latter approach may be 
good because then we're shuffling data that's already distributed across 
multiple workers)

> JdbcIO read is very slow when query return a lot of rows
> --------------------------------------------------------
>
>                 Key: BEAM-2803
>                 URL: https://issues.apache.org/jira/browse/BEAM-2803
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-extensions
>    Affects Versions: Not applicable
>            Reporter: Jérémie Vexiau
>            Assignee: Jean-Baptiste Onofré
>              Labels: performance
>             Fix For: 2.2.0
>
>         Attachments: test1500K.png, test1M.png, test2M.jpg, test500k.png
>
>
> Hi,
> I'm using JdbcIO reader in batch mode with the postgresql driver.
> my select query return more than 5 Millions rows
> using cursors with Statement.setFetchSize().
> these ParDo are OK :
> {code:java}
>           .apply(ParDo.of(new ReadFn<>(this))).setCoder(getCoder())
>           .apply(ParDo.of(new DoFn<T, KV<Integer, T>>() {
>             private Random random;
>             @Setup
>             public void setup() {
>               random = new Random();
>             }
>             @ProcessElement
>             public void processElement(ProcessContext context) {
>               context.output(KV.of(random.nextInt(), context.element()));
>             }
>           }))
> {code}
> but reshuffle is very very slow. 
> it must be the GroupByKey with more than 5 millions of Key.
> {code:java}
> .apply(GroupByKey.<Integer, T>create())
> {code}
> is there a way to optimize the reshuffle, or use another method to prevent 
> fusion ? 
> thanks in advance,
> edit: 
> I add some tests 
> I use google dataflow as runner, with 1 worker, 2 max, and workerMachineType 
> n1-standard-2
> and  autoscalingAlgorithm THROUGHPUT_BASED
> First one : query return 500 000 results : 
> !test500k.png!
> as we can see,
>  parDo(Read) is about 1300 r/s
> groupByKey is about 1080 r/s
> 2nd : query return 1 000 000 results 
> !test1M.png!
> parDo(read) => 1480 r/s
> groupByKey => 634 r/s
> 3rd : query return 1 500 000 results
> !test1500K.png!
> parDo(read) => 1700 r/s
> groupByKey => 565 r/s
> 4th query return 2 000 000 results
> !test2M.jpg!
> parDo(read) => 1485 r/s
> groupByKey => 537 r/s
> As we can see, groupByKey  rate decrease when number of record are more 
> important.
> ps:  2nd worker start just after ParDo(read) is succeed



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to