Actually, I don't think setting the number of shards by the Runner will solve the problem. The shuffling logic still remains. And, as observed by Jozef, it doesn't necessarily lead to balanced shards.

The sharding logic of the Beam IO is handy but it shouldn't be strictly necessary when the data is already partitioned nicely.

It seems the sharding logic is primarily necessary because there is no notion of a worker's ID in Beam. In Flink, you can retrieve the worker ID at runtime and every worker just directly writes its results to a file, suffixed by its worker id. This avoids any GroupByKey or Reshuffle.

Robert, don't we already have Reshuffle which can be overriden? However, it is not used by the WritesFiles code.


-Max

On 26.10.18 11:41, Robert Bradshaw wrote:
I think it's worth adding a URN for the operation of distributing "evenly" into an "appropriate" number of shards. A naive implementation would add random keys and to a ReshufflePerKey, but runners could override this to do a reshuffle and then key by whatever notion of bundle/worker/shard identifier they have that lines up with the number of actual workers.

On Fri, Oct 26, 2018 at 11:34 AM Jozef Vilcek <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote:

    Thanks for the JIRA. If I understand it correctly ... so runner
    determined sharding will avoid extra shuffle? Will it just write
    worker local available data to it's shard? Something similar to
    coalesce in Spark?

    On Fri, Oct 26, 2018 at 11:26 AM Maximilian Michels <m...@apache.org
    <mailto:m...@apache.org>> wrote:

        Oh ok, thanks for the pointer. Coming from Flink, the default is
        that
        the sharding is determined by the runtime distribution. Indeed,
        we will
        have to add an overwrite to the Flink Runner, similar to this one:

        
https://github.com/apache/beam/commit/cbb922c8a72680c5b8b4299197b515abf650bfdf#diff-a79d5c3c33f6ef1c4894b97ca907d541R347

        Jira issue: https://issues.apache.org/jira/browse/BEAM-5865

        Thanks,
        Max

        On 25.10.18 22:37, Reuven Lax wrote:
         > FYI the Dataflow runner automatically sets the default number
        of shards
         > (I believe to be 2 * num_workers). Probably we should do
        something
         > similar for the Flink runner.
         >
         > This needs to be done by the runner, as # of workers is a runner
         > concept; the SDK itself has no concept of workers.
         >
         > On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek
        <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>
         > <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com>>> wrote:
         >
         >     If I do not specify shards for unbounded collection, I get
         >
         >     Caused by: java.lang.IllegalArgumentException: When applying
         >     WriteFiles to an unbounded PCollection, must specify
        number of
         >     output shards explicitly
         >              at
         >     org.apache.beam.repackaged.beam_sdks_java_core.com
        
<http://beam_sdks_java_core.com>.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
         >              at
         > org.apache.beam.sdk.io
        <http://org.apache.beam.sdk.io>.WriteFiles.expand(WriteFiles.java:289)
         >
         >     Around same lines in WriteFiles is also a check for
        windowed writes.
         >     I believe FileIO enables it explicitly when windowing is
        present. In
         >     filesystem written files are per window and shard.
         >
         >     On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels
        <m...@apache.org <mailto:m...@apache.org>
         >     <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
         >
         >         I agree it would be nice to keep the current
        distribution of
         >         elements
         >         instead of doing a shuffle based on an artificial
        shard key.
         >
         >         Have you tried `withWindowedWrites()`? Also, why do
        you say you
         >         need to
         >         specify the number of shards in streaming mode?
         >
         >         -Max
         >
         >         On 25.10.18 10:12, Jozef Vilcek wrote:
         >          > Hm, yes, this makes sense now, but what can be
        done for my
         >         case? I do
         >          > not want to end up with too many files on disk.
         >          >
         >          > I think what I am looking for is to instruct IO
        that do not
         >         do again
         >          > random shard and reshuffle but just assume number
        of shards
         >         equal to
         >          > number of workers and shard ID is a worker ID.
         >          > Is this doable in beam model?
         >          >
         >          > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels
         >         <m...@apache.org <mailto:m...@apache.org>
        <mailto:m...@apache.org <mailto:m...@apache.org>>
         >          > <mailto:m...@apache.org <mailto:m...@apache.org>
        <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
         >          >
         >          >     The FlinkRunner uses a hash function
        (MurmurHash) on each
         >         key which
         >          >     places keys somewhere in the hash space. The
        hash space
         >         (2^32) is split
         >          >     among the partitions (5 in your case). Given
        enough keys,
         >         the chance
         >          >     increases they are equally spread.
         >          >
         >          >     This should be similar to what the other
        Runners do.
         >          >
         >          >     On 24.10.18 10:58, Jozef Vilcek wrote:
         >          >      >
         >          >      > So if I run 5 workers with 50 shards, I end
        up with:
         >          >      >
         >          >      > DurationBytes receivedRecords received
         >          >      >   2m 39s        900 MB            465,525
         >          >      >   2m 39s       1.76 GB            930,720
         >          >      >   2m 39s        789 MB            407,315
         >          >      >   2m 39s       1.32 GB            698,262
         >          >      >   2m 39s        788 MB            407,310
         >          >      >
         >          >      > Still not good but better than with 5
        shards where
         >         some workers
         >          >     did not
         >          >      > participate at all.
         >          >      > So, problem is in some layer which
        distributes keys /
         >         shards
         >          >     among workers?
         >          >      >
         >          >      > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax
         >         <re...@google.com <mailto:re...@google.com>
        <mailto:re...@google.com <mailto:re...@google.com>>
         >          >     <mailto:re...@google.com
        <mailto:re...@google.com> <mailto:re...@google.com
        <mailto:re...@google.com>>>
         >          >      > <mailto:re...@google.com
        <mailto:re...@google.com> <mailto:re...@google.com
        <mailto:re...@google.com>>
         >         <mailto:re...@google.com <mailto:re...@google.com>
        <mailto:re...@google.com <mailto:re...@google.com>>>>> wrote:
         >          >      >
         >          >      >     withNumShards(5) generates 5 random
        shards. It
         >         turns out that
         >          >      >     statistically when you generate 5
        random shards
         >         and you have 5
         >          >      >     works, the probability is reasonably
        high that
         >         some workers
         >          >     will get
         >          >      >     more than one shard (and as a result
        not all
         >         workers will
         >          >      >     participate). Are you able to set the
        number of
         >         shards larger
         >          >     than 5?
         >          >      >
         >          >      >     On Wed, Oct 24, 2018 at 12:28 AM Jozef
        Vilcek
         >          >     <jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com> <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com>>
         >         <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com> <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com>>>
         >          >      >     <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com>
         >         <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com>>
         >          >     <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com>
         >         <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com>>>>> wrote:
         >          >      >
         >          >      >         cc (dev)
         >          >      >
         >          >      >         I tried to run the example with
        FlinkRunner in
         >         batch mode and
         >          >      >         received again bad data spread
        among the workers.
         >          >      >
         >          >      >         When I tried to remove number of
        shards for
         >         batch mode in
         >          >     above
         >          >      >         example, pipeline crashed before launch
         >          >      >
         >          >      >         Caused by:
        java.lang.IllegalStateException:
         >         Inputs to Flatten
         >          >      >         had incompatible triggers:
         >          >      >
         >          >
>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
         >          >      >         entCountAtLeast(10000)),
         >          >      >
         >          >
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
         >          >      >         hour)))),
         >          >      >
         >          >
>  AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo >          >      >  rever(AfterPane.elementCountAtLeast(1)),
         >          >      >
         >          >
>  Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
         >          >      >
         >          >      >
         >          >      >
         >          >      >
         >          >      >
         >          >      >         On Tue, Oct 23, 2018 at 12:01 PM
        Jozef Vilcek
         >          >      >         <jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com>
         >         <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com>> <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com>
         >         <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com>>>
         >          >     <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com>
         >         <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com>> <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com>
         >         <mailto:jozo.vil...@gmail.com
        <mailto:jozo.vil...@gmail.com>>>>> wrote:
         >          >      >
         >          >      >             Hi Max,
         >          >      >
         >          >      >             I forgot to mention that
        example is run in
         >         streaming
         >          >     mode,
         >          >      >             therefore I can not do writes
        without
         >         specifying shards.
         >          >      >             FileIO explicitly asks for them.
         >          >      >
         >          >      >             I am not sure where the problem is.
         >         FlinkRunner is
         >          >     only one
         >          >      >             I used.
         >          >      >
         >          >      >             On Tue, Oct 23, 2018 at 11:27 AM
         >         Maximilian Michels
         >          >      >             <m...@apache.org
        <mailto:m...@apache.org> <mailto:m...@apache.org
        <mailto:m...@apache.org>>
         >         <mailto:m...@apache.org <mailto:m...@apache.org>
        <mailto:m...@apache.org <mailto:m...@apache.org>>>
         >          >     <mailto:m...@apache.org <mailto:m...@apache.org>
        <mailto:m...@apache.org <mailto:m...@apache.org>>
         >         <mailto:m...@apache.org <mailto:m...@apache.org>
        <mailto:m...@apache.org <mailto:m...@apache.org>>>>> wrote:
         >          >      >
         >          >      >                 Hi Jozef,
         >          >      >
         >          >      >                 This does not look like a
        FlinkRunner
         >         related
         >          >     problem,
         >          >      >                 but is caused by
         >          >      >                 the `WriteFiles` sharding
        logic. It
         >         assigns keys and
         >          >      >                 does a Reshuffle
         >          >      >                 which apparently does not
        lead to good
         >         data spread in
         >          >      >                 your case.
         >          >      >
         >          >      >                 Do you see the same
        behavior without
         >          >     `withNumShards(5)`?
         >          >      >
         >          >      >                 Thanks,
         >          >      >                 Max
         >          >      >
         >          >      >                 On 22.10.18 11:57, Jozef
        Vilcek wrote:
         >          >      >                  > Hello,
         >          >      >                  >
         >          >      >                  > I am having some trouble
        to get a
         >         balanced
         >          >     write via
         >          >      >                 FileIO. Workers at
         >          >      >                  > the shuffle side where
        data per
         >         window fire are
         >          >      >                 written to the
         >          >      >                  > filesystem receive
        unbalanced
         >         number of events.
         >          >      >                  >
         >          >      >                  > Here is a naive code
        example:
         >          >      >                  >
         >          >      >                  >      val read =
        KafkaIO.read()
         >          >      >                  >          .withTopic("topic")
         >          >      >                  >
         >         .withBootstrapServers("kafka1:9092")
         >          >      >                  >
         >          >      >
         >           .withKeyDeserializer(classOf[ByteArrayDeserializer])
         >          >      >                  >
         >          >      >
>          >  .withValueDeserializer(classOf[ByteArrayDeserializer]) >          >      >                  > .withProcessingTime()
         >          >      >                  >
         >          >      >                  >      pipeline
         >          >      >                  >          .apply(read)
>          >      >                  > .apply(MapElements.via(new
         >          >      >                  >
        SimpleFunction[KafkaRecord[Array[Byte],
         >          >     Array[Byte]],
         >          >      >                 String]() {
         >          >      >                  >            override def
        apply(input:
         >          >      >                 KafkaRecord[Array[Byte],
         >          >      >                  > Array[Byte]]): String = {
         >          >      >                  >              new
         >         String(input.getKV.getValue,
         >          >     "UTF-8")
         >          >      >                  >            }
         >          >      >                  >          }))
         >          >      >                  >
         >          >      >                  >
         >          >      >                  >
         >          >      >
         >          >
>  .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
         >          >      >                  >
         >          >     .triggering(AfterWatermark.pastEndOfWindow()
         >          >      >                  >
         >          >      >
>          >  .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
         >          >      >                  >
         >          >      >
>          >  .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
         >          >      >                  >
         >          >      >
>          >  Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
         >          >      >                  >
         >          >      >                  >
         >          >      >
         >          >
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1))))))) >          >      >                  > .discardingFiredPanes()
         >          >      >                  >
         >          >      >
         >           .withAllowedLateness(Duration.standardDays(7)))
         >          >      >                  >
>          >      >                  > .apply(FileIO.write() >          >      >                  > .via(TextIO.sink())
         >          >      >                  >              .withNaming(new
         >          >      >                 SafeFileNaming(outputPath,
        ".txt"))
         >          >      >                  >
         >         .withTempDirectory(tempLocation)
>          >      >                  > .withNumShards(5))
         >          >      >                  >
         >          >      >                  >
         >          >      >                  > If I run this on Beam
        2.6.0 with
         >         Flink 1.5.0 on 5
         >          >      >                 workers (equal to
         >          >      >                  > number of shards), I
        would expect
         >         that each worker
         >          >      >                 will participate on
         >          >      >                  > persisting shards and
        equally,
         >         since code uses
         >          >     fixed
         >          >      >                 number of shards
         >          >      >                  > (and random shard
        assign?). But
         >         reality is
         >          >     different
         >          >      >                 (see 2 attachements
         >          >      >                  > - statistiscs from flink
        task
         >         reading from
         >          >     kafka and
         >          >      >                 task writing to files)
         >          >      >                  >
         >          >      >                  > What am I missing? How
        to achieve
         >         balanced writes?
         >          >      >                  >
         >          >      >                  > Thanks,
         >          >      >                  > Jozef
         >          >      >                  >
         >          >      >                  >
         >          >      >
         >          >
         >

Reply via email to