I think it's worth adding a URN for the operation of distributing
"evenly" into an "appropriate" number of shards. A naive implementation
would add random keys and to a ReshufflePerKey, but runners could
override this to do a reshuffle and then key by whatever notion of
bundle/worker/shard identifier they have that lines up with the number
of actual workers.
On Fri, Oct 26, 2018 at 11:34 AM Jozef Vilcek <jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>> wrote:
Thanks for the JIRA. If I understand it correctly ... so runner
determined sharding will avoid extra shuffle? Will it just write
worker local available data to it's shard? Something similar to
coalesce in Spark?
On Fri, Oct 26, 2018 at 11:26 AM Maximilian Michels <m...@apache.org
<mailto:m...@apache.org>> wrote:
Oh ok, thanks for the pointer. Coming from Flink, the default is
that
the sharding is determined by the runtime distribution. Indeed,
we will
have to add an overwrite to the Flink Runner, similar to this one:
https://github.com/apache/beam/commit/cbb922c8a72680c5b8b4299197b515abf650bfdf#diff-a79d5c3c33f6ef1c4894b97ca907d541R347
Jira issue: https://issues.apache.org/jira/browse/BEAM-5865
Thanks,
Max
On 25.10.18 22:37, Reuven Lax wrote:
> FYI the Dataflow runner automatically sets the default number
of shards
> (I believe to be 2 * num_workers). Probably we should do
something
> similar for the Flink runner.
>
> This needs to be done by the runner, as # of workers is a runner
> concept; the SDK itself has no concept of workers.
>
> On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek
<jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>
> <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>>> wrote:
>
> If I do not specify shards for unbounded collection, I get
>
> Caused by: java.lang.IllegalArgumentException: When applying
> WriteFiles to an unbounded PCollection, must specify
number of
> output shards explicitly
> at
> org.apache.beam.repackaged.beam_sdks_java_core.com
<http://beam_sdks_java_core.com>.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
> at
> org.apache.beam.sdk.io
<http://org.apache.beam.sdk.io>.WriteFiles.expand(WriteFiles.java:289)
>
> Around same lines in WriteFiles is also a check for
windowed writes.
> I believe FileIO enables it explicitly when windowing is
present. In
> filesystem written files are per window and shard.
>
> On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels
<m...@apache.org <mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>
> I agree it would be nice to keep the current
distribution of
> elements
> instead of doing a shuffle based on an artificial
shard key.
>
> Have you tried `withWindowedWrites()`? Also, why do
you say you
> need to
> specify the number of shards in streaming mode?
>
> -Max
>
> On 25.10.18 10:12, Jozef Vilcek wrote:
> > Hm, yes, this makes sense now, but what can be
done for my
> case? I do
> > not want to end up with too many files on disk.
> >
> > I think what I am looking for is to instruct IO
that do not
> do again
> > random shard and reshuffle but just assume number
of shards
> equal to
> > number of workers and shard ID is a worker ID.
> > Is this doable in beam model?
> >
> > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels
> <m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
> >
> > The FlinkRunner uses a hash function
(MurmurHash) on each
> key which
> > places keys somewhere in the hash space. The
hash space
> (2^32) is split
> > among the partitions (5 in your case). Given
enough keys,
> the chance
> > increases they are equally spread.
> >
> > This should be similar to what the other
Runners do.
> >
> > On 24.10.18 10:58, Jozef Vilcek wrote:
> > >
> > > So if I run 5 workers with 50 shards, I end
up with:
> > >
> > > DurationBytes receivedRecords received
> > > 2m 39s 900 MB 465,525
> > > 2m 39s 1.76 GB 930,720
> > > 2m 39s 789 MB 407,315
> > > 2m 39s 1.32 GB 698,262
> > > 2m 39s 788 MB 407,310
> > >
> > > Still not good but better than with 5
shards where
> some workers
> > did not
> > > participate at all.
> > > So, problem is in some layer which
distributes keys /
> shards
> > among workers?
> > >
> > > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax
> <re...@google.com <mailto:re...@google.com>
<mailto:re...@google.com <mailto:re...@google.com>>
> > <mailto:re...@google.com
<mailto:re...@google.com> <mailto:re...@google.com
<mailto:re...@google.com>>>
> > > <mailto:re...@google.com
<mailto:re...@google.com> <mailto:re...@google.com
<mailto:re...@google.com>>
> <mailto:re...@google.com <mailto:re...@google.com>
<mailto:re...@google.com <mailto:re...@google.com>>>>> wrote:
> > >
> > > withNumShards(5) generates 5 random
shards. It
> turns out that
> > > statistically when you generate 5
random shards
> and you have 5
> > > works, the probability is reasonably
high that
> some workers
> > will get
> > > more than one shard (and as a result
not all
> workers will
> > > participate). Are you able to set the
number of
> shards larger
> > than 5?
> > >
> > > On Wed, Oct 24, 2018 at 12:28 AM Jozef
Vilcek
> > <jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com> <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>>
> <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com> <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>>>
> > > <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>
> <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>>
> > <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>
> <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>>>>> wrote:
> > >
> > > cc (dev)
> > >
> > > I tried to run the example with
FlinkRunner in
> batch mode and
> > > received again bad data spread
among the workers.
> > >
> > > When I tried to remove number of
shards for
> batch mode in
> > above
> > > example, pipeline crashed before launch
> > >
> > > Caused by:
java.lang.IllegalStateException:
> Inputs to Flatten
> > > had incompatible triggers:
> > >
> >
>
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
> > > entCountAtLeast(10000)),
> > >
> >
>
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
> > > hour)))),
> > >
> >
>
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
> > >
rever(AfterPane.elementCountAtLeast(1)),
> > >
> >
>
Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
> > >
> > >
> > >
> > >
> > >
> > > On Tue, Oct 23, 2018 at 12:01 PM
Jozef Vilcek
> > > <jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>
> <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>> <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>
> <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>>>
> > <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>
> <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>> <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>
> <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>>>>> wrote:
> > >
> > > Hi Max,
> > >
> > > I forgot to mention that
example is run in
> streaming
> > mode,
> > > therefore I can not do writes
without
> specifying shards.
> > > FileIO explicitly asks for them.
> > >
> > > I am not sure where the problem is.
> FlinkRunner is
> > only one
> > > I used.
> > >
> > > On Tue, Oct 23, 2018 at 11:27 AM
> Maximilian Michels
> > > <m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org
<mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>
> > <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>>> wrote:
> > >
> > > Hi Jozef,
> > >
> > > This does not look like a
FlinkRunner
> related
> > problem,
> > > but is caused by
> > > the `WriteFiles` sharding
logic. It
> assigns keys and
> > > does a Reshuffle
> > > which apparently does not
lead to good
> data spread in
> > > your case.
> > >
> > > Do you see the same
behavior without
> > `withNumShards(5)`?
> > >
> > > Thanks,
> > > Max
> > >
> > > On 22.10.18 11:57, Jozef
Vilcek wrote:
> > > > Hello,
> > > >
> > > > I am having some trouble
to get a
> balanced
> > write via
> > > FileIO. Workers at
> > > > the shuffle side where
data per
> window fire are
> > > written to the
> > > > filesystem receive
unbalanced
> number of events.
> > > >
> > > > Here is a naive code
example:
> > > >
> > > > val read =
KafkaIO.read()
> > > > .withTopic("topic")
> > > >
> .withBootstrapServers("kafka1:9092")
> > > >
> > >
> .withKeyDeserializer(classOf[ByteArrayDeserializer])
> > > >
> > >
> >
.withValueDeserializer(classOf[ByteArrayDeserializer])
> > > >
.withProcessingTime()
> > > >
> > > > pipeline
> > > > .apply(read)
> > > >
.apply(MapElements.via(new
> > > >
SimpleFunction[KafkaRecord[Array[Byte],
> > Array[Byte]],
> > > String]() {
> > > > override def
apply(input:
> > > KafkaRecord[Array[Byte],
> > > > Array[Byte]]): String = {
> > > > new
> String(input.getKV.getValue,
> > "UTF-8")
> > > > }
> > > > }))
> > > >
> > > >
> > > >
> > >
> >
>
.apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
> > > >
> > .triggering(AfterWatermark.pastEndOfWindow()
> > > >
> > >
> >
.withEarlyFirings(AfterPane.elementCountAtLeast(40000))
> > > >
> > >
> >
.withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
> > > >
> > >
> >
Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
> > > >
> > > >
> > >
> >
>
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
> > > >
.discardingFiredPanes()
> > > >
> > >
> .withAllowedLateness(Duration.standardDays(7)))
> > > >
> > > >
.apply(FileIO.write()
> > > >
.via(TextIO.sink())
> > > > .withNaming(new
> > > SafeFileNaming(outputPath,
".txt"))
> > > >
> .withTempDirectory(tempLocation)
> > > >
.withNumShards(5))
> > > >
> > > >
> > > > If I run this on Beam
2.6.0 with
> Flink 1.5.0 on 5
> > > workers (equal to
> > > > number of shards), I
would expect
> that each worker
> > > will participate on
> > > > persisting shards and
equally,
> since code uses
> > fixed
> > > number of shards
> > > > (and random shard
assign?). But
> reality is
> > different
> > > (see 2 attachements
> > > > - statistiscs from flink
task
> reading from
> > kafka and
> > > task writing to files)
> > > >
> > > > What am I missing? How
to achieve
> balanced writes?
> > > >
> > > > Thanks,
> > > > Jozef
> > > >
> > > >
> > >
> >
>