So if I run 5 workers with 50 shards, I end up with: Duration Bytes received Records received 2m 39s 900 MB 465,525 2m 39s 1.76 GB 930,720 2m 39s 789 MB 407,315 2m 39s 1.32 GB 698,262 2m 39s 788 MB 407,310
Still not good but better than with 5 shards where some workers did not participate at all. So, problem is in some layer which distributes keys / shards among workers? On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax <re...@google.com> wrote: > withNumShards(5) generates 5 random shards. It turns out that > statistically when you generate 5 random shards and you have 5 works, the > probability is reasonably high that some workers will get more than one > shard (and as a result not all workers will participate). Are you able to > set the number of shards larger than 5? > > On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >> cc (dev) >> >> I tried to run the example with FlinkRunner in batch mode and received >> again bad data spread among the workers. >> >> When I tried to remove number of shards for batch mode in above example, >> pipeline crashed before launch >> >> Caused by: java.lang.IllegalStateException: Inputs to Flatten had >> incompatible triggers: >> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem >> entCountAtLeast(10000)), >> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 >> hour)))), >> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo >> rever(AfterPane.elementCountAtLeast(1)), >> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane()))) >> >> >> >> >> >> On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek <jozo.vil...@gmail.com> >> wrote: >> >>> Hi Max, >>> >>> I forgot to mention that example is run in streaming mode, therefore I >>> can not do writes without specifying shards. FileIO explicitly asks for >>> them. >>> >>> I am not sure where the problem is. FlinkRunner is only one I used. >>> >>> On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels <m...@apache.org> >>> wrote: >>> >>>> Hi Jozef, >>>> >>>> This does not look like a FlinkRunner related problem, but is caused by >>>> the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle >>>> which apparently does not lead to good data spread in your case. >>>> >>>> Do you see the same behavior without `withNumShards(5)`? >>>> >>>> Thanks, >>>> Max >>>> >>>> On 22.10.18 11:57, Jozef Vilcek wrote: >>>> > Hello, >>>> > >>>> > I am having some trouble to get a balanced write via FileIO. Workers >>>> at >>>> > the shuffle side where data per window fire are written to the >>>> > filesystem receive unbalanced number of events. >>>> > >>>> > Here is a naive code example: >>>> > >>>> > val read = KafkaIO.read() >>>> > .withTopic("topic") >>>> > .withBootstrapServers("kafka1:9092") >>>> > .withKeyDeserializer(classOf[ByteArrayDeserializer]) >>>> > .withValueDeserializer(classOf[ByteArrayDeserializer]) >>>> > .withProcessingTime() >>>> > >>>> > pipeline >>>> > .apply(read) >>>> > .apply(MapElements.via(new >>>> > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() { >>>> > override def apply(input: KafkaRecord[Array[Byte], >>>> > Array[Byte]]): String = { >>>> > new String(input.getKV.getValue, "UTF-8") >>>> > } >>>> > })) >>>> > >>>> > >>>> > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1))) >>>> > .triggering(AfterWatermark.pastEndOfWindow() >>>> > >>>> .withEarlyFirings(AfterPane.elementCountAtLeast(40000)) >>>> > >>>> .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger]( >>>> > >>>> Repeatedly.forever(AfterPane.elementCountAtLeast(10000)), >>>> > >>>> > >>>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1))))))) >>>> > .discardingFiredPanes() >>>> > .withAllowedLateness(Duration.standardDays(7))) >>>> > >>>> > .apply(FileIO.write() >>>> > .via(TextIO.sink()) >>>> > .withNaming(new SafeFileNaming(outputPath, ".txt")) >>>> > .withTempDirectory(tempLocation) >>>> > .withNumShards(5)) >>>> > >>>> > >>>> > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to >>>> > number of shards), I would expect that each worker will participate >>>> on >>>> > persisting shards and equally, since code uses fixed number of shards >>>> > (and random shard assign?). But reality is different (see 2 >>>> attachements >>>> > - statistiscs from flink task reading from kafka and task writing to >>>> files) >>>> > >>>> > What am I missing? How to achieve balanced writes? >>>> > >>>> > Thanks, >>>> > Jozef >>>> > >>>> > >>>> >>>