Oh ok, thanks for the pointer. Coming from Flink, the default is that the sharding is determined by the runtime distribution. Indeed, we will have to add an overwrite to the Flink Runner, similar to this one:

https://github.com/apache/beam/commit/cbb922c8a72680c5b8b4299197b515abf650bfdf#diff-a79d5c3c33f6ef1c4894b97ca907d541R347

Jira issue: https://issues.apache.org/jira/browse/BEAM-5865

Thanks,
Max

On 25.10.18 22:37, Reuven Lax wrote:
FYI the Dataflow runner automatically sets the default number of shards (I believe to be 2 * num_workers). Probably we should do something similar for the Flink runner.

This needs to be done by the runner, as # of workers is a runner concept; the SDK itself has no concept of workers.

On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote:

    If I do not specify shards for unbounded collection, I get

    Caused by: java.lang.IllegalArgumentException: When applying
    WriteFiles to an unbounded PCollection, must specify number of
    output shards explicitly
             at
    
org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
             at
    org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:289)

    Around same lines in WriteFiles is also a check for windowed writes.
    I believe FileIO enables it explicitly when windowing is present. In
    filesystem written files are per window and shard.

    On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels <m...@apache.org
    <mailto:m...@apache.org>> wrote:

        I agree it would be nice to keep the current distribution of
        elements
        instead of doing a shuffle based on an artificial shard key.

        Have you tried `withWindowedWrites()`? Also, why do you say you
        need to
        specify the number of shards in streaming mode?

        -Max

        On 25.10.18 10:12, Jozef Vilcek wrote:
         > Hm, yes, this makes sense now, but what can be done for my
        case? I do
         > not want to end up with too many files on disk.
         >
         > I think what I am looking for is to instruct IO that do not
        do again
         > random shard and reshuffle but just assume number of shards
        equal to
         > number of workers and shard ID is a worker ID.
         > Is this doable in beam model?
         >
         > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels
        <m...@apache.org <mailto:m...@apache.org>
         > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
         >
         >     The FlinkRunner uses a hash function (MurmurHash) on each
        key which
         >     places keys somewhere in the hash space. The hash space
        (2^32) is split
         >     among the partitions (5 in your case). Given enough keys,
        the chance
         >     increases they are equally spread.
         >
         >     This should be similar to what the other Runners do.
         >
         >     On 24.10.18 10:58, Jozef Vilcek wrote:
         >      >
         >      > So if I run 5 workers with 50 shards, I end up with:
         >      >
         >      > DurationBytes receivedRecords received
         >      >   2m 39s        900 MB            465,525
         >      >   2m 39s       1.76 GB            930,720
         >      >   2m 39s        789 MB            407,315
         >      >   2m 39s       1.32 GB            698,262
         >      >   2m 39s        788 MB            407,310
         >      >
         >      > Still not good but better than with 5 shards where
        some workers
         >     did not
         >      > participate at all.
         >      > So, problem is in some layer which distributes keys /
        shards
         >     among workers?
         >      >
         >      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax
        <re...@google.com <mailto:re...@google.com>
         >     <mailto:re...@google.com <mailto:re...@google.com>>
         >      > <mailto:re...@google.com <mailto:re...@google.com>
        <mailto:re...@google.com <mailto:re...@google.com>>>> wrote:
         >      >
         >      >     withNumShards(5) generates 5 random shards. It
        turns out that
         >      >     statistically when you generate 5 random shards
        and you have 5
         >      >     works, the probability is reasonably high that
        some workers
         >     will get
         >      >     more than one shard (and as a result not all
        workers will
         >      >     participate). Are you able to set the number of
        shards larger
         >     than 5?
         >      >
         >      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek
         >     <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>
        <mailto:jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>>
         >      >     <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com>
         >     <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com>>>> wrote:
         >      >
         >      >         cc (dev)
         >      >
         >      >         I tried to run the example with FlinkRunner in
        batch mode and
         >      >         received again bad data spread among the workers.
         >      >
         >      >         When I tried to remove number of shards for
        batch mode in
         >     above
         >      >         example, pipeline crashed before launch
         >      >
         >      >         Caused by: java.lang.IllegalStateException:
        Inputs to Flatten
         >      >         had incompatible triggers:
         >      >
>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
         >      >         entCountAtLeast(10000)),
         >      >
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
         >      >         hour)))),
         >      >
>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
         >      >         rever(AfterPane.elementCountAtLeast(1)),
         >      >
>  Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
         >      >
         >      >
         >      >
         >      >
         >      >
         >      >         On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
         >      >         <jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com> <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com>>
         >     <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com> <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com>>>> wrote:
         >      >
         >      >             Hi Max,
         >      >
         >      >             I forgot to mention that example is run in
        streaming
         >     mode,
         >      >             therefore I can not do writes without
        specifying shards.
         >      >             FileIO explicitly asks for them.
         >      >
         >      >             I am not sure where the problem is.
        FlinkRunner is
         >     only one
         >      >             I used.
         >      >
         >      >             On Tue, Oct 23, 2018 at 11:27 AM
        Maximilian Michels
         >      >             <m...@apache.org <mailto:m...@apache.org>
        <mailto:m...@apache.org <mailto:m...@apache.org>>
         >     <mailto:m...@apache.org <mailto:m...@apache.org>
        <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
         >      >
         >      >                 Hi Jozef,
         >      >
         >      >                 This does not look like a FlinkRunner
        related
         >     problem,
         >      >                 but is caused by
         >      >                 the `WriteFiles` sharding logic. It
        assigns keys and
         >      >                 does a Reshuffle
         >      >                 which apparently does not lead to good
        data spread in
         >      >                 your case.
         >      >
         >      >                 Do you see the same behavior without
         >     `withNumShards(5)`?
         >      >
         >      >                 Thanks,
         >      >                 Max
         >      >
         >      >                 On 22.10.18 11:57, Jozef Vilcek wrote:
         >      >                  > Hello,
         >      >                  >
         >      >                  > I am having some trouble to get a
        balanced
         >     write via
         >      >                 FileIO. Workers at
         >      >                  > the shuffle side where data per
        window fire are
         >      >                 written to the
         >      >                  > filesystem receive unbalanced
        number of events.
         >      >                  >
         >      >                  > Here is a naive code example:
         >      >                  >
         >      >                  >      val read = KafkaIO.read()
         >      >                  >          .withTopic("topic")
>      >                  > .withBootstrapServers("kafka1:9092")
         >      >                  >
>      >  .withKeyDeserializer(classOf[ByteArrayDeserializer])
         >      >                  >
         >      >
         >       .withValueDeserializer(classOf[ByteArrayDeserializer])
         >      >                  >          .withProcessingTime()
         >      >                  >
         >      >                  >      pipeline
         >      >                  >          .apply(read)
         >      >                  >          .apply(MapElements.via(new
         >      >                  > SimpleFunction[KafkaRecord[Array[Byte],
         >     Array[Byte]],
         >      >                 String]() {
         >      >                  >            override def apply(input:
         >      >                 KafkaRecord[Array[Byte],
         >      >                  > Array[Byte]]): String = {
         >      >                  >              new
        String(input.getKV.getValue,
         >     "UTF-8")
         >      >                  >            }
         >      >                  >          }))
         >      >                  >
         >      >                  >
         >      >                  >
         >      >
>  .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
         >      >                  >
         >     .triggering(AfterWatermark.pastEndOfWindow()
         >      >                  >
         >      >
         >       .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
         >      >                  >
         >      >
         >       .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
         >      >                  >
         >      >
         >       Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
         >      >                  >
         >      >                  >
         >      >
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
         >      >                  >              .discardingFiredPanes()
         >      >                  >
>      >  .withAllowedLateness(Duration.standardDays(7)))
         >      >                  >
         >      >                  >          .apply(FileIO.write()
         >      >                  >              .via(TextIO.sink())
         >      >                  >              .withNaming(new
         >      >                 SafeFileNaming(outputPath, ".txt"))
>      >                  > .withTempDirectory(tempLocation)
         >      >                  >              .withNumShards(5))
         >      >                  >
         >      >                  >
         >      >                  > If I run this on Beam 2.6.0 with
        Flink 1.5.0 on 5
         >      >                 workers (equal to
         >      >                  > number of shards), I would expect
        that each worker
         >      >                 will participate on
         >      >                  > persisting shards and equally,
        since code uses
         >     fixed
         >      >                 number of shards
         >      >                  > (and random shard assign?). But
        reality is
         >     different
         >      >                 (see 2 attachements
         >      >                  > - statistiscs from flink task
        reading from
         >     kafka and
         >      >                 task writing to files)
         >      >                  >
         >      >                  > What am I missing? How to achieve
        balanced writes?
         >      >                  >
         >      >                  > Thanks,
         >      >                  > Jozef
         >      >                  >
         >      >                  >
         >      >
         >

Reply via email to